Mercurial > hg > monetdb-java
changeset 71:4e2a2a81cc6a embedded
Some parsing successful on the old mapi. About to start testing table result sets.
line wrap: on
line diff
--- a/src/main/java/nl/cwi/monetdb/embedded/env/MonetDBEmbeddedDatabase.java +++ b/src/main/java/nl/cwi/monetdb/embedded/env/MonetDBEmbeddedDatabase.java @@ -8,7 +8,7 @@ package nl.cwi.monetdb.embedded.env; -import nl.cwi.monetdb.mcl.io.JDBCEmbeddedConnection; +import nl.cwi.monetdb.mcl.connection.embedded.JDBCEmbeddedConnection; import java.util.concurrent.ConcurrentHashMap;
--- a/src/main/java/nl/cwi/monetdb/jdbc/MonetConnection.java +++ b/src/main/java/nl/cwi/monetdb/jdbc/MonetConnection.java @@ -2,14 +2,12 @@ package nl.cwi.monetdb.jdbc; import nl.cwi.monetdb.jdbc.types.INET; import nl.cwi.monetdb.jdbc.types.URL; -import nl.cwi.monetdb.mcl.connection.MCLException; -import nl.cwi.monetdb.mcl.connection.Debugger; -import nl.cwi.monetdb.mcl.connection.MonetDBLanguage; +import nl.cwi.monetdb.mcl.connection.*; +import nl.cwi.monetdb.mcl.connection.socket.MapiLanguage; import nl.cwi.monetdb.mcl.protocol.ProtocolException; import nl.cwi.monetdb.mcl.protocol.AbstractProtocol; import nl.cwi.monetdb.mcl.protocol.ServerResponses; import nl.cwi.monetdb.mcl.responses.*; -import nl.cwi.monetdb.mcl.connection.SendThread; import nl.cwi.monetdb.mcl.responses.DataBlockResponse; import nl.cwi.monetdb.mcl.responses.ResultSetResponse; @@ -62,7 +60,7 @@ public abstract class MonetConnection ex /** the successful processed input properties */ protected final Properties conn_props; /** The language to connect with */ - protected MonetDBLanguage language; + protected IMonetDBLanguage language; /** The database to connect to */ protected String database; /** Authentication hash method */ @@ -83,8 +81,8 @@ public abstract class MonetConnection ex } }; - // See javadoc for documentation about WeakHashMap if you don't know what - // it does !!!NOW!!! (only when you deal with it of course) + // See javadoc for documentation about WeakHashMap if you don't know what it does !!!NOW!!! + // (only when you deal with it of course) /** A Map containing all (active) Statements created from this Connection */ private Map<Statement,?> statements = new WeakHashMap<Statement, Object>(); @@ -101,25 +99,23 @@ public abstract class MonetConnection ex protected AbstractProtocol<?> protocol; /** - * Constructor of a Connection for MonetDB. At this moment the - * current implementation limits itself to storing the given host, - * database, username and password for later use by the - * createStatement() call. This constructor is only accessible to - * classes from the jdbc package. + * Constructor of a Connection for MonetDB. At this moment the current implementation limits itself to storing the + * given host, database, username and password for later use by the createStatement() call. This constructor is + * only accessible to classes from the jdbc package. * * @throws IOException if an error occurs */ - public MonetConnection(Properties props, String database, String hash, String language, boolean blobIsBinary, - boolean isDebugging) throws IOException { + public MonetConnection(Properties props, String database, String hash, IMonetDBLanguage language, + boolean blobIsBinary, boolean isDebugging) throws IOException { this.conn_props = props; this.database = database; this.hash = hash; - this.language = MonetDBLanguage.GetLanguageFromString(language); + this.language = language; this.blobIsBinary = blobIsBinary; this.isDebugging = isDebugging; } - public MonetDBLanguage getLanguage() { + public IMonetDBLanguage getLanguage() { return language; } @@ -127,19 +123,24 @@ public abstract class MonetConnection ex this.closed = closed; } + public boolean isDebugging() { + return isDebugging; + } + public void setDebugging(String filename) throws IOException { ourSavior = new Debugger(filename); } + public Debugger getOurSavior() { + return ourSavior; + } + /** - * Connects to the given host and port, logging in as the given - * user. If followRedirect is false, a RedirectionException is - * thrown when a redirect is encountered. + * Connects to the given host and port, logging in as the given user. If followRedirect is false, a + * RedirectionException is thrown when a redirect is encountered. * - * @return A List with informational (warning) messages. If this - * list is empty; then there are no warnings. - * @throws IOException if an I/O error occurs when creating the - * socket + * @return A List with informational (warning) messages. If this list is empty; then there are no warnings. + * @throws IOException if an I/O error occurs when creating the socket * @throws ProtocolException if bogus data is received * @throws MCLException if an MCL related error occurs */ @@ -159,51 +160,51 @@ public abstract class MonetConnection ex return this.protocol; } + public abstract void sendControlCommand(ControlCommands con, int data) throws SQLException; + /** - * Releases this Connection object's database and JDBC resources - * immediately instead of waiting for them to be automatically - * released. All Statements created from this Connection will be - * closed when this method is called. + * Releases this Connection object's database and JDBC resources immediately instead of waiting for them to be + * automatically released. All Statements created from this Connection will be closed when this method is called. * - * Calling the method close on a Connection object that is already - * closed is a no-op. + * Calling the method close on a Connection object that is already closed is a no-op. */ @Override - public synchronized void close() { - for (Statement st : statements.keySet()) { - try { - st.close(); - } catch (SQLException e) { - // better luck next time! - } - } - //close the debugger - try { - if (ourSavior != null) { - ourSavior.close(); + public void close() { + synchronized(protocol) { + for (Statement st : statements.keySet()) { + try { + st.close(); + } catch (SQLException e) { + // better luck next time! + } } - } catch (IOException e) { - // ignore it + //close the debugger + try { + if (ourSavior != null) { + ourSavior.close(); + } + } catch (IOException e) { + // ignore it + } + // close the socket or the embedded server + try { + this.closeUnderlyingConnection(); + } catch (IOException e) { + // ignore it + } + // close active SendThread if any + if (sendThread != null) { + sendThread.shutdown(); + sendThread = null; + } + // report ourselves as closed + closed = true; } - // close the socket or the embedded server - try { - this.closeUnderlyingConnection(); - } catch (IOException e) { - // ignore it - } - // close active SendThread if any - if (sendThread != null) { - sendThread.shutdown(); - sendThread = null; - } - // report ourselves as closed - closed = true; } /** - * Destructor called by garbage collector before destroying this - * object tries to disconnect the MonetDB connection if it has not - * been disconnected already. + * Destructor called by garbage collector before destroying this object tries to disconnect the MonetDB connection + * if it has not been disconnected already. */ @Override protected void finalize() throws Throwable { @@ -214,9 +215,8 @@ public abstract class MonetConnection ex //== methods of interface Connection /** - * Clears all warnings reported for this Connection object. After a - * call to this method, the method getWarnings returns null until a - * new warning is reported for this Connection object. + * Clears all warnings reported for this Connection object. After a call to this method, the method getWarnings + * returns null until a new warning is reported for this Connection object. */ @Override public void clearWarnings() { @@ -235,54 +235,44 @@ public abstract class MonetConnection ex } /** - * Makes all changes made since the previous commit/rollback - * permanent and releases any database locks currently held by this - * Connection object. This method should be used only when - * auto-commit mode has been disabled. + * Makes all changes made since the previous commit/rollback permanent and releases any database locks currently + * held by this Connection object. This method should be used only when auto-commit mode has been disabled. * - * @throws SQLException if a database access error occurs or this - * Connection object is in auto-commit mode + * @throws SQLException if a database access error occurs or this Connection object is in auto-commit mode * @see #setAutoCommit(boolean) */ @Override public void commit() throws SQLException { - // note: can't use sendIndependentCommand here because we need - // to process the auto_commit state the server gives + // note: can't use sendIndependentCommand here because we need to process the auto_commit state the server gives this.createResponseList("COMMIT"); } /** - * Creates a Statement object for sending SQL statements to the - * database. SQL statements without parameters are normally - * executed using Statement objects. If the same SQL statement is - * executed many times, it may be more efficient to use a - * PreparedStatement object. + * Creates a Statement object for sending SQL statements to the database. SQL statements without parameters are + * normally executed using Statement objects. If the same SQL statement is executed many times, it may be more + * efficient to use a PreparedStatement object. * - * Result sets created using the returned Statement object will by - * default be type TYPE_FORWARD_ONLY and have a concurrency level of - * CONCUR_READ_ONLY. + * Result sets created using the returned Statement object will by default be type TYPE_FORWARD_ONLY and have a + * concurrency level of CONCUR_READ_ONLY. * * @return a new default Statement object * @throws SQLException if a database access error occurs */ @Override public Statement createStatement() throws SQLException { - return createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT); + return createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, + ResultSet.HOLD_CURSORS_OVER_COMMIT); } /** - * Creates a Statement object that will generate ResultSet objects - * with the given type and concurrency. This method is the same as - * the createStatement method above, but it allows the default - * result set type and concurrency to be overridden. + * Creates a Statement object that will generate ResultSet objects with the given type and concurrency. This method + * is the same as the createStatement method above, but it allows the default result set type and concurrency to be + * overridden. * - * @param resultSetType a result set type; one of - * ResultSet.TYPE_FORWARD_ONLY, ResultSet.TYPE_SCROLL_INSENSITIVE, - * or ResultSet.TYPE_SCROLL_SENSITIVE - * @param resultSetConcurrency a concurrency type; one of - * ResultSet.CONCUR_READ_ONLY or ResultSet.CONCUR_UPDATABLE - * @return a new Statement object that will generate ResultSet objects with - * the given type and concurrency + * @param resultSetType a result set type; one of ResultSet.TYPE_FORWARD_ONLY, ResultSet.TYPE_SCROLL_INSENSITIVE, + * or ResultSet.TYPE_SCROLL_SENSITIVE + * @param resultSetConcurrency a concurrency type; one of ResultSet.CONCUR_READ_ONLY or ResultSet.CONCUR_UPDATABLE + * @return a new Statement object that will generate ResultSet objects with the given type and concurrency * @throws SQLException if a database access error occurs */ @Override @@ -380,7 +370,7 @@ public abstract class MonetConnection ex */ @Override public DatabaseMetaData getMetaData() throws SQLException { - if (this.language != MonetDBLanguage.LANG_SQL) { + if (this.language != MapiLanguage.LANG_SQL) { throw new SQLException("This method is only supported in SQL mode", "M0M04"); } return new MonetDatabaseMetaData(this); @@ -719,7 +709,7 @@ public abstract class MonetConnection ex @Override public void setAutoCommit(boolean autoCommit) throws SQLException { if (this.autoCommit != autoCommit) { - this.sendControlCommand("auto_commit " + (autoCommit ? "1" : "0")); + this.sendControlCommand(ControlCommands.AUTO_COMMIT, (autoCommit ? 1 : 0)); this.autoCommit = autoCommit; } } @@ -1320,9 +1310,9 @@ public abstract class MonetConnection ex * @throws SQLException if an IO exception or a database error occurs */ public void sendIndependentCommand(String command) throws SQLException { - synchronized (this) { + synchronized (protocol) { try { - protocol.writeNextCommand(language.getQueryTemplateIndex(0), command.getBytes(), language.getQueryTemplateIndex(1)); + protocol.writeNextQuery(language.getQueryTemplateIndex(0), command, language.getQueryTemplateIndex(1)); protocol.waitUntilPrompt(); if (protocol.getCurrentServerResponseHeader() == ServerResponses.ERROR) { String error = protocol.getRemainingStringLine(0); @@ -1338,34 +1328,6 @@ public abstract class MonetConnection ex } /** - * Sends the given string to MonetDB as control statement, making - * sure there is a prompt after the command is sent. All possible - * returned information is discarded. Encountered errors are - * reported. - * - * @param command the exact string to send to MonetDB - * @throws SQLException if an IO exception or a database error occurs - */ - public void sendControlCommand(String command) throws SQLException { - // send X command - synchronized (this) { - try { - protocol.writeNextCommand(language.getCommandTemplateIndex(0), command.getBytes(), language.getCommandTemplateIndex(1)); - protocol.waitUntilPrompt(); - if (protocol.getCurrentServerResponseHeader() == ServerResponses.ERROR) { - String error = protocol.getRemainingStringLine(0); - throw new SQLException(error.substring(6), error.substring(0, 5)); - } - } catch (SocketTimeoutException e) { - close(); // JDBC 4.1 semantics, abort() - throw new SQLException("connection timed out", "08M33"); - } catch (IOException e) { - throw new SQLException(e.getMessage(), "08000"); - } - } - } - - /** * Adds a warning to the pile of warnings this Connection object * has. If there were no warnings (or clearWarnings was called) * this warning will be the first, otherwise this warning will get @@ -1532,198 +1494,202 @@ public abstract class MonetConnection ex * @throws SQLException if a database error occurs */ @SuppressWarnings("fallthrough") - public void executeQuery(byte[][] templ, String query) throws SQLException { + public void executeQuery(String[] templ, String query) throws SQLException { String error = null; try { - // make sure we're ready to send query; read data till we - // have the prompt it is possible (and most likely) that we - // already have the prompt and do not have to skip any - // lines. Ignore errors from previous result sets. - protocol.waitUntilPrompt(); + synchronized (protocol) { + // make sure we're ready to send query; read data till we + // have the prompt it is possible (and most likely) that we + // already have the prompt and do not have to skip any + // lines. Ignore errors from previous result sets. + protocol.waitUntilPrompt(); - // {{{ set reply size - /** - * Change the reply size of the server. If the given - * value is the same as the current value known to use, - * then ignore this call. If it is set to 0 we get a - * prompt after the server sent it's header. - */ - int size = cachesize == 0 ? DEF_FETCHSIZE : cachesize; - size = maxrows != 0 ? Math.min(maxrows, size) : size; - // don't do work if it's not needed - if (language == MonetDBLanguage.LANG_SQL && size != curReplySize && - !Arrays.deepEquals(templ, language.getCommandTemplates())) { - sendControlCommand("reply_size " + size); + // {{{ set reply size + /** + * Change the reply size of the server. If the given + * value is the same as the current value known to use, + * then ignore this call. If it is set to 0 we get a + * prompt after the server sent it's header. + */ + int size = cachesize == 0 ? DEF_FETCHSIZE : cachesize; + size = maxrows != 0 ? Math.min(maxrows, size) : size; + // don't do work if it's not needed + if (language == MapiLanguage.LANG_SQL && size != curReplySize && + !Arrays.deepEquals(templ, language.getCommandTemplates())) { + sendControlCommand(ControlCommands.REPLY_SIZE, size); - // store the reply size after a successful change - curReplySize = size; - } - // }}} set reply size + // store the reply size after a successful change + curReplySize = size; + } + // }}} set reply size - // If the query is larger than the TCP buffer size, use a - // special send thread to avoid deadlock with the server due - // to blocking behaviour when the buffer is full. Because - // the server will be writing back results to us, it will - // eventually block as well when its TCP buffer gets full, - // as we are blocking an not consuming from it. The result - // is a state where both client and server want to write, - // but block. - if (query.length() > getBlockSize()) { - // get a reference to the send thread - if (sendThread == null) { - sendThread = new SendThread(protocol); + // If the query is larger than the TCP buffer size, use a + // special send thread to avoid deadlock with the server due + // to blocking behaviour when the buffer is full. Because + // the server will be writing back results to us, it will + // eventually block as well when its TCP buffer gets full, + // as we are blocking an not consuming from it. The result + // is a state where both client and server want to write, + // but block. + if (query.length() > getBlockSize()) { + // get a reference to the send thread + if (sendThread == null) { + sendThread = new SendThread(protocol); + } + // tell it to do some work! + sendThread.runQuery(templ, query); + } else { + // this is a simple call, which is a lot cheaper and will + // always succeed for small queries. + protocol.writeNextQuery((templ[0] == null) ? "" : templ[0], query, + (templ[1] == null) ? "" : templ[1]); } - // tell it to do some work! - sendThread.runQuery(templ, query); - } else { - // this is a simple call, which is a lot cheaper and will - // always succeed for small queries. - protocol.writeNextCommand((templ[0] == null) ? MonetDBLanguage.EmptyString : templ[0], - query.getBytes(), (templ[1] == null) ? MonetDBLanguage.EmptyString : templ[1]); - } - // go for new results - protocol.fetchNextResponseData(); - ServerResponses nextResponse = protocol.getCurrentServerResponseHeader(); - IResponse res = null; - while (nextResponse != ServerResponses.PROMPT) { - // each response should start with a start of header (or error) - switch (nextResponse) { - case SOHEADER: - // make the response object, and fill it - try { - switch (protocol.getNextStarterHeader()) { - case Q_PARSE: - throw new ProtocolException("Q_PARSE header not allowed here", 1); - case Q_TABLE: - case Q_PREPARE: { - res = protocol.getNextResultSetResponse(MonetConnection.this, - ResponseList.this, this.seqnr); - ResultSetResponse rsreponse = (ResultSetResponse) res; - // only add this resultset to - // the hashmap if it can possibly - // have an additional datablock - if (rsreponse.getRowcount() < rsreponse.getTuplecount()) { - if (rsresponses == null) { - rsresponses = new HashMap<>(); + // go for new results + protocol.fetchNextResponseData(); //&1 0 27 1 27 + ServerResponses nextResponse = protocol.getCurrentServerResponseHeader(); + IResponse res = null; + while (nextResponse != ServerResponses.PROMPT) { + // each response should start with a start of header (or error) + switch (nextResponse) { + case SOHEADER: + // make the response object, and fill it + try { + switch (protocol.getNextStarterHeader()) { + case Q_PARSE: + throw new ProtocolException("Q_PARSE header not allowed here", 1); + case Q_TABLE: + case Q_PREPARE: { + res = protocol.getNextResultSetResponse(MonetConnection.this, + ResponseList.this, this.seqnr); + ResultSetResponse rsreponse = (ResultSetResponse) res; + // only add this resultset to + // the hashmap if it can possibly + // have an additional datablock + if (rsreponse.getRowcount() < rsreponse.getTuplecount()) { + if (rsresponses == null) { + rsresponses = new HashMap<>(); + } + rsresponses.put(rsreponse.getId(), rsreponse); + } + } + break; + case Q_UPDATE: + res = protocol.getNextUpdateResponse(); + break; + case Q_SCHEMA: + res = protocol.getNextSchemaResponse(); + break; + case Q_TRANS: + res = protocol.getNextAutoCommitResponse(); + boolean isAutoCommit = ((AutoCommitResponse) res).isAutocommit(); + + if (MonetConnection.this.getAutoCommit() && isAutoCommit) { + MonetConnection.this.addWarning("Server enabled auto commit mode " + + "while local state already was auto commit.", "01M11"); } - rsresponses.put(rsreponse.getId(), rsreponse); + MonetConnection.this.autoCommit = isAutoCommit; + break; + case Q_BLOCK: { + DataBlockResponse next = protocol.getNextDatablockResponse(rsresponses); + if (next == null) { + error = "M0M12!No ResultSetResponse for a DataBlock found"; + break; + } + res = next; } - } break; - case Q_UPDATE: - res = protocol.getNextUpdateResponse(); - break; - case Q_SCHEMA: - res = protocol.getNextSchemaResponse(); break; - case Q_TRANS: - res = protocol.getNextAutoCommitResponse(); - boolean isAutoCommit = ((AutoCommitResponse)res).isAutocommit(); + } + } catch (ProtocolException e) { + error = "M0M10!error while parsing start of header:\n" + e.getMessage() + " found: '" + + protocol.getRemainingStringLine(0).charAt(e.getErrorOffset()) + "'" + + " in: \"" + protocol.getRemainingStringLine(0) + "\"" + " at pos: " + + e.getErrorOffset(); + // flush all the rest + protocol.waitUntilPrompt(); + nextResponse = protocol.getCurrentServerResponseHeader(); + break; + } - if (MonetConnection.this.getAutoCommit() && isAutoCommit) { - MonetConnection.this.addWarning("Server enabled auto commit mode " + - "while local state already was auto commit.", "01M11"); - } - MonetConnection.this.autoCommit = isAutoCommit; - break; - case Q_BLOCK: { - DataBlockResponse next = protocol.getNextDatablockResponse(rsresponses); - if (next == null) { - error = "M0M12!No ResultSetResponse for a DataBlock found"; + // immediately handle errors after parsing the header (res may be null) + if (error != null) { + protocol.waitUntilPrompt(); + nextResponse = protocol.getCurrentServerResponseHeader(); + break; + } + + // here we have a res object, which we can start filling + if (res instanceof IIncompleteResponse) { + IIncompleteResponse iter = (IIncompleteResponse) res; + while (iter.wantsMore()) { + try { + protocol.fetchNextResponseData(); + iter.addLine(protocol.getCurrentServerResponseHeader(), protocol.getCurrentData()); + } catch (ProtocolException ex) { + // right, some protocol violation, skip the rest of the result + error = "M0M10!" + ex.getMessage(); + protocol.waitUntilPrompt(); + nextResponse = protocol.getCurrentServerResponseHeader(); break; } - res = next; - } break; + } + } + + if (error != null) { + break; + } + + // it is of no use to store DataBlockResponses, you never want to + // retrieve them directly anyway + if (!(res instanceof DataBlockResponse)) { + responses.add(res); } - } catch (ProtocolException e) { - error = "M0M10!error while parsing start of header:\n" + e.getMessage() + " found: '" - + protocol.getRemainingStringLine(0).charAt(e.getErrorOffset()) + "'" + - " in: \"" + protocol.getRemainingStringLine(0) + "\"" + " at pos: " - + e.getErrorOffset(); - // flush all the rest + // read the next line (can be prompt, new result, error, etc.) before we start the loop over + protocol.fetchNextResponseData(); + nextResponse = protocol.getCurrentServerResponseHeader(); + break; + case INFO: + addWarning(protocol.getRemainingStringLine(1), "01000"); + // read the next line (can be prompt, new result, error, etc.) before we start the loop over + protocol.fetchNextResponseData(); + nextResponse = protocol.getCurrentServerResponseHeader(); + break; + case ERROR: + // read everything till the prompt (should be error) we don't know if we ignore some + // garbage here... but the log should reveal that + error = protocol.getRemainingStringLine(1); protocol.waitUntilPrompt(); nextResponse = protocol.getCurrentServerResponseHeader(); break; - } - - // immediately handle errors after parsing the header (res may be null) - if (error != null) { - protocol.waitUntilPrompt(); - nextResponse = protocol.getCurrentServerResponseHeader(); - break; - } - - // here we have a res object, which we can start filling - if(res instanceof IIncompleteResponse) { - IIncompleteResponse iter = (IIncompleteResponse) res; - while (iter.wantsMore()) { - try { - protocol.fetchNextResponseData(); - iter.addLine(protocol.getCurrentServerResponseHeader(), protocol.getCurrentData()); - } catch (ProtocolException ex) { - // right, some protocol violation, skip the rest of the result - error = "M0M10!" + ex.getMessage(); - protocol.waitUntilPrompt(); - nextResponse = protocol.getCurrentServerResponseHeader(); - break; - } - } - } - - if (error != null) { - break; - } - - // it is of no use to store DataBlockResponses, you never want to - // retrieve them directly anyway - if (!(res instanceof DataBlockResponse)) { - responses.add(res); - } - // read the next line (can be prompt, new result, error, etc.) before we start the loop over - protocol.fetchNextResponseData(); - nextResponse = protocol.getCurrentServerResponseHeader(); - break; - case INFO: - addWarning(protocol.getRemainingStringLine(1), "01000"); - // read the next line (can be prompt, new result, error, etc.) before we start the loop over - protocol.fetchNextResponseData(); - nextResponse = protocol.getCurrentServerResponseHeader(); - break; - case ERROR: - // read everything till the prompt (should be error) we don't know if we ignore some - // garbage here... but the log should reveal that - protocol.waitUntilPrompt(); - nextResponse = protocol.getCurrentServerResponseHeader(); - error = protocol.getRemainingStringLine(1); - break; - default: - throw new SQLException("!M0M10!protocol violation, unexpected line!"); - } - } - - // if we used the sendThread, make sure it has finished - if (sendThread != null) { - String tmp = sendThread.getErrors(); - if (tmp != null) { - if (error == null) { - error = "08000!" + tmp; - } else { - error += "\n08000!" + tmp; + default: + throw new SQLException("Protocol violation, unexpected line!", "M0M10"); } } - } - if (error != null) { - SQLException ret = null; - String[] errors = error.split("\n"); - for (String error1 : errors) { - if (ret == null) { - ret = new SQLException(error1.substring(6), error1.substring(0, 5)); - } else { - ret.setNextException(new SQLException(error1.substring(6), error1.substring(0, 5))); + + // if we used the sendThread, make sure it has finished + if (sendThread != null) { + String tmp = sendThread.getErrors(); + if (tmp != null) { + if (error == null) { + error = "08000!" + tmp; + } else { + error += "\n08000!" + tmp; + } } } - throw ret; + if (error != null) { + SQLException ret = null; + String[] errors = error.split("\n"); + for (String error1 : errors) { + if (ret == null) { + ret = new SQLException(error1.substring(6), error1.substring(0, 5)); + } else { + ret.setNextException(new SQLException(error1.substring(6), error1.substring(0, 5))); + } + } + throw ret; + } } } catch (SocketTimeoutException e) { this.close(); // JDBC 4.1 semantics, abort()
--- a/src/main/java/nl/cwi/monetdb/jdbc/MonetPreparedStatement.java +++ b/src/main/java/nl/cwi/monetdb/jdbc/MonetPreparedStatement.java @@ -8,6 +8,7 @@ package nl.cwi.monetdb.jdbc; +import nl.cwi.monetdb.mcl.connection.ControlCommands; import nl.cwi.monetdb.mcl.responses.ResultSetResponse; import java.io.InputStream; @@ -2368,7 +2369,7 @@ public class MonetPreparedStatement exte public void close() { try { if (!closed && id != -1) - connection.sendControlCommand("release " + id); + connection.sendControlCommand(ControlCommands.RELEASE, id); } catch (SQLException e) { // probably server closed connection }
--- a/src/main/java/nl/cwi/monetdb/jdbc/MonetStatement.java +++ b/src/main/java/nl/cwi/monetdb/jdbc/MonetStatement.java @@ -199,7 +199,7 @@ public class MonetStatement extends Mone BatchUpdateException e = new BatchUpdateException("Error(s) occurred while executing the batch, see next SQLExceptions for details", "22000", counts); StringBuilder tmpBatch = new StringBuilder(connection.getBlockSize()); - String sep = new String(connection.getLanguage().getQueryTemplateIndex(2)); + String sep = connection.getLanguage().getQueryTemplateIndex(2); for (int i = 0; i < batch.size(); i++) { String tmp = batch.get(i); if (sep.length() + tmp.length() > connection.getBlockSize()) {
rename from src/main/java/nl/cwi/monetdb/util/ChannelSecurity.java rename to src/main/java/nl/cwi/monetdb/mcl/connection/ChannelSecurity.java --- a/src/main/java/nl/cwi/monetdb/util/ChannelSecurity.java +++ b/src/main/java/nl/cwi/monetdb/mcl/connection/ChannelSecurity.java @@ -1,6 +1,5 @@ -package nl.cwi.monetdb.util; +package nl.cwi.monetdb.mcl.connection; -import java.io.UnsupportedEncodingException; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; @@ -12,8 +11,7 @@ public class ChannelSecurity { private static char HexChar(int n) { return (n > 9) ? (char) ('a' + (n - 10)) : (char) ('0' + n); } /** - * Small helper method to convert a byte string to a hexadecimal - * string representation. + * Small helper method to convert a byte string to a hexadecimalstring representation. * * @param digest the byte array to convert * @return the byte array as hexadecimal string @@ -28,15 +26,14 @@ public class ChannelSecurity { return new String(result); } - public static String DigestStrings(String algorithm, String... toDigests) { + public static String DigestStrings(String algorithm, byte[]... toDigests) { try { MessageDigest md = MessageDigest.getInstance(algorithm); - for (String str : toDigests) { - md.update(str.getBytes("UTF-8")); + for (byte[] str : toDigests) { + md.update(str); } - byte[] digest = md.digest(); - return ToHex(digest); - } catch (NoSuchAlgorithmException | UnsupportedEncodingException e) { + return ToHex(md.digest()); + } catch (NoSuchAlgorithmException e) { throw new AssertionError("internal error: " + e.toString()); } }
new file mode 100644 --- /dev/null +++ b/src/main/java/nl/cwi/monetdb/mcl/connection/ControlCommands.java @@ -0,0 +1,16 @@ +package nl.cwi.monetdb.mcl.connection; + +/** + * Created by ferreira on 12/9/16. + */ +public enum ControlCommands { + + /** Send autocommit statement */ + AUTO_COMMIT, + /** Set reply size for the server */ + REPLY_SIZE, + /** Release a prepared statement data */ + RELEASE, + /** Close a query */ + CLOSE +}
--- a/src/main/java/nl/cwi/monetdb/mcl/connection/Debugger.java +++ b/src/main/java/nl/cwi/monetdb/mcl/connection/Debugger.java @@ -22,6 +22,10 @@ public class Debugger implements Closeab this.log = log; } + public Writer getLog() { + return log; + } + /** * Enables logging to a file what is read and written from and to * the server. Logging can be enabled at any time. However, it is @@ -56,7 +60,7 @@ public class Debugger implements Closeab * @param message the message to log * @throws IOException if an IO error occurs while writing to the logfile */ - private void logTx(String message) throws IOException { + public void logTx(String message) throws IOException { log.write("TX " + System.currentTimeMillis() + ": " + message + "\n"); } @@ -68,7 +72,7 @@ public class Debugger implements Closeab * @param message the message to log * @throws IOException if an IO error occurs while writing to the logfile */ - private void logTd(String message) throws IOException { + public void logTd(String message) throws IOException { log.write("TD " + System.currentTimeMillis() + ": " + message + "\n"); } @@ -83,7 +87,7 @@ public class Debugger implements Closeab * @param message the message to log * @throws IOException if an IO error occurs while writing to the logfile */ - private void logRx(String message) throws IOException { + public void logRx(String message) throws IOException { log.write("RX " + System.currentTimeMillis() + ": " + message + "\n"); log.flush(); } @@ -97,7 +101,7 @@ public class Debugger implements Closeab * @param message the message to log * @throws IOException if an IO error occurs while writing to the logfile */ - private void logRd(String message) throws IOException { + public void logRd(String message) throws IOException { log.write("RD " + System.currentTimeMillis() + ": " + message + "\n"); log.flush(); }
new file mode 100644 --- /dev/null +++ b/src/main/java/nl/cwi/monetdb/mcl/connection/IMonetDBLanguage.java @@ -0,0 +1,17 @@ +package nl.cwi.monetdb.mcl.connection; + +/** + * Created by ferreira on 12/9/16. + */ +public interface IMonetDBLanguage { + + String getQueryTemplateIndex(int index); + + String getCommandTemplateIndex(int index); + + String[] getQueryTemplates(); + + String[] getCommandTemplates(); + + String getRepresentation(); +}
--- a/src/main/java/nl/cwi/monetdb/mcl/connection/MCLException.java +++ b/src/main/java/nl/cwi/monetdb/mcl/connection/MCLException.java @@ -16,11 +16,11 @@ public class MCLException extends Except private static final long serialVersionUID = 1L; - MCLException(String e) { + public MCLException(String e) { super(e); } - MCLException(Throwable t) { + public MCLException(Throwable t) { super(t); } }
--- a/src/main/java/nl/cwi/monetdb/mcl/connection/MonetDBConnectionFactory.java +++ b/src/main/java/nl/cwi/monetdb/mcl/connection/MonetDBConnectionFactory.java @@ -2,6 +2,9 @@ package nl.cwi.monetdb.mcl.connection; import nl.cwi.monetdb.jdbc.MonetConnection; import nl.cwi.monetdb.jdbc.MonetDriver; +import nl.cwi.monetdb.mcl.connection.embedded.EmbeddedConnection; +import nl.cwi.monetdb.mcl.connection.socket.MapiConnection; +import nl.cwi.monetdb.mcl.connection.socket.MapiLanguage; import nl.cwi.monetdb.mcl.protocol.ProtocolException; import java.io.File; @@ -74,6 +77,7 @@ public final class MonetDBConnectionFact try { sockTimeout = Integer.parseInt(timout); } catch (NumberFormatException e) { + sockTimeout = 0; failedparse2 = true; props.setProperty("so_timeout", "0"); } @@ -154,7 +158,7 @@ public final class MonetDBConnectionFact } //set the timezone - if (res.getLanguage() == MonetDBLanguage.LANG_SQL) { + if (res.getLanguage() == MapiLanguage.LANG_SQL) { // enable auto commit res.setAutoCommit(true); @@ -167,7 +171,6 @@ public final class MonetDBConnectionFact offset -= (offset / 60) * 60; tz += (offset < 10 ? "0" : "") + offset; - // TODO check this res.sendIndependentCommand("SET TIME ZONE INTERVAL '" + tz + "' HOUR TO MINUTE"); }
--- a/src/main/java/nl/cwi/monetdb/mcl/connection/SendThread.java +++ b/src/main/java/nl/cwi/monetdb/mcl/connection/SendThread.java @@ -30,12 +30,11 @@ public class SendThread extends Thread { SHUTDOWN } - private byte[][] templ; - private byte[] query; + private String[] templ; + private String query; private AbstractProtocol protocol; private String error; private SendThreadStatus state = SendThreadStatus.WAIT; - private final Lock sendLock = new ReentrantLock(); private final Condition queryAvailable = sendLock.newCondition(); private final Condition waiting = sendLock.newCondition(); @@ -54,34 +53,34 @@ public class SendThread extends Thread { @Override public void run() { - sendLock.lock(); + this.sendLock.lock(); try { while (true) { - while (state == SendThreadStatus.WAIT) { + while (this.state == SendThreadStatus.WAIT) { try { - queryAvailable.await(); + this.queryAvailable.await(); } catch (InterruptedException e) { // woken up, eh? } } - if (state == SendThreadStatus.SHUTDOWN) + if (this.state == SendThreadStatus.SHUTDOWN) break; // state is QUERY here try { - protocol.writeNextCommand((templ[0] == null ? MonetDBLanguage.EmptyString : templ[0]), query, - (templ[1] == null ? MonetDBLanguage.EmptyString : templ[1])); + this.protocol.writeNextQuery((templ[0] == null ? "" : templ[0]), query, + (templ[1] == null ? "" : templ[1])); } catch (IOException e) { - error = e.getMessage(); + this.error = e.getMessage(); } // update our state, and notify, maybe someone is waiting // for us in throwErrors - state = SendThreadStatus.WAIT; - waiting.signal(); + this.state = SendThreadStatus.WAIT; + this.waiting.signal(); } } finally { - sendLock.unlock(); + this.sendLock.unlock(); } } @@ -93,21 +92,19 @@ public class SendThread extends Thread { * @param query the query itself * @throws SQLException if this SendThread is already in use */ - public void runQuery(byte[][] templ, String query) throws SQLException { - sendLock.lock(); + public void runQuery(String[] templ, String query) throws SQLException { + this.sendLock.lock(); try { - if (state != SendThreadStatus.WAIT) { + if (this.state != SendThreadStatus.WAIT) { throw new SQLException("SendThread already in use or shutting down!", "M0M03"); } - this.templ = templ; - this.query = query.getBytes(); - + this.query = query; // let the thread know there is some work to do - state = SendThreadStatus.QUERY; - queryAvailable.signal(); + this.state = SendThreadStatus.QUERY; + this.queryAvailable.signal(); } finally { - sendLock.unlock(); + this.sendLock.unlock(); } } @@ -117,20 +114,20 @@ public class SendThread extends Thread { * @return the errors or null if none */ public String getErrors() { - sendLock.lock(); + this.sendLock.lock(); try { // make sure the thread is in WAIT state, not QUERY - while (state == SendThreadStatus.QUERY) { + while (this.state == SendThreadStatus.QUERY) { try { - waiting.await(); + this.waiting.await(); } catch (InterruptedException e) { // just try again } } - if (state == SendThreadStatus.SHUTDOWN) - error = "SendThread is shutting down"; + if (this.state == SendThreadStatus.SHUTDOWN) + this.error = "SendThread is shutting down"; } finally { - sendLock.unlock(); + this.sendLock.unlock(); } return error; }
rename from src/main/java/nl/cwi/monetdb/mcl/connection/EmbeddedConnection.java rename to src/main/java/nl/cwi/monetdb/mcl/connection/embedded/EmbeddedConnection.java --- a/src/main/java/nl/cwi/monetdb/mcl/connection/EmbeddedConnection.java +++ b/src/main/java/nl/cwi/monetdb/mcl/connection/embedded/EmbeddedConnection.java @@ -1,13 +1,16 @@ -package nl.cwi.monetdb.mcl.connection; +package nl.cwi.monetdb.mcl.connection.embedded; import nl.cwi.monetdb.embedded.env.MonetDBEmbeddedConnection; import nl.cwi.monetdb.embedded.env.MonetDBEmbeddedDatabase; import nl.cwi.monetdb.embedded.env.MonetDBEmbeddedException; import nl.cwi.monetdb.jdbc.MonetConnection; +import nl.cwi.monetdb.mcl.connection.ControlCommands; +import nl.cwi.monetdb.mcl.connection.MCLException; import nl.cwi.monetdb.mcl.protocol.ProtocolException; import nl.cwi.monetdb.mcl.protocol.embedded.EmbeddedProtocol; import java.io.*; +import java.sql.SQLException; import java.util.List; import java.util.Properties; @@ -18,9 +21,9 @@ public final class EmbeddedConnection ex private final String directory; - EmbeddedConnection(Properties props, String database, String hash, String language, boolean blobIsBinary, - boolean isDebugging, String directory) throws IOException { - super(props, database, hash, language, blobIsBinary, isDebugging); + public EmbeddedConnection(Properties props, String database, String hash, String language, boolean blobIsBinary, + boolean isDebugging, String directory) throws IOException { + super(props, database, hash, EmbeddedLanguage.GetLanguageFromString(language), blobIsBinary, isDebugging); this.directory = directory; } @@ -33,7 +36,7 @@ public final class EmbeddedConnection ex } @Override - public List<String> connect(String user, String pass) throws IOException, ProtocolException, MCLException { + public List<String> connect(String username, String password) throws IOException, ProtocolException, MCLException { try { if(MonetDBEmbeddedDatabase.IsDatabaseRunning() && !MonetDBEmbeddedDatabase.GetDatabaseDirectory().equals(this.directory)) { @@ -73,4 +76,11 @@ public final class EmbeddedConnection ex public void closeUnderlyingConnection() throws IOException { ((EmbeddedProtocol)protocol).getEmbeddedConnection().closeConnection(); } + + @Override + public void sendControlCommand(ControlCommands con, int data) throws SQLException { + synchronized (protocol) { + + } + } }
new file mode 100644 --- /dev/null +++ b/src/main/java/nl/cwi/monetdb/mcl/connection/embedded/EmbeddedLanguage.java @@ -0,0 +1,57 @@ +package nl.cwi.monetdb.mcl.connection.embedded; + +import nl.cwi.monetdb.mcl.connection.IMonetDBLanguage; + +/** + * Created by ferreira on 12/9/16. + */ +public enum EmbeddedLanguage implements IMonetDBLanguage { + + /** the SQL language */ + LANG_SQL(new String[]{"", "\n;", "\n;\n"}, "sql"), + /** an unknown language */ + LANG_UNKNOWN(null, "unknown"); + + EmbeddedLanguage(String[] queryTemplates, String representation) { + this.queryTemplates = queryTemplates; + this.representation = representation; + } + + private final String[] queryTemplates; + + private final String representation; + + @Override + public String getQueryTemplateIndex(int index) { + return queryTemplates[index]; + } + + @Override + public String getCommandTemplateIndex(int index) { + return null; + } + + @Override + public String[] getQueryTemplates() { + return queryTemplates; + } + + @Override + public String[] getCommandTemplates() { + return null; + } + + @Override + public String getRepresentation() { + return representation; + } + + public static EmbeddedLanguage GetLanguageFromString(String language) { + switch (language) { + case "sql": + return LANG_SQL; + default: + return LANG_UNKNOWN; + } + } +}
rename from src/main/java/nl/cwi/monetdb/mcl/io/JDBCEmbeddedConnection.java rename to src/main/java/nl/cwi/monetdb/mcl/connection/embedded/JDBCEmbeddedConnection.java --- a/src/main/java/nl/cwi/monetdb/mcl/io/JDBCEmbeddedConnection.java +++ b/src/main/java/nl/cwi/monetdb/mcl/connection/embedded/JDBCEmbeddedConnection.java @@ -1,4 +1,4 @@ -package nl.cwi.monetdb.mcl.io; +package nl.cwi.monetdb.mcl.connection.embedded; import nl.cwi.monetdb.embedded.env.MonetDBEmbeddedConnection;
new file mode 100644 --- /dev/null +++ b/src/main/java/nl/cwi/monetdb/mcl/connection/socket/AbstractSocket.java @@ -0,0 +1,141 @@ +package nl.cwi.monetdb.mcl.connection.socket; + +import java.io.Closeable; +import java.io.IOException; +import java.net.Socket; +import java.net.SocketException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.CharBuffer; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.StandardCharsets; + +/** + * Created by ferreira on 12/9/16. + */ +public abstract class AbstractSocket implements Closeable { + + protected final Socket socket; + + protected final MapiConnection connection; + + private final ByteBuffer bufferIn; + + private final ByteBuffer bufferOut; + + private final CharBuffer stringsEncoded; + + private final CharBuffer stringsDecoded; + + private final CharsetEncoder asciiEncoder = StandardCharsets.UTF_8.newEncoder(); + + private final CharsetDecoder asciiDecoder = StandardCharsets.UTF_8.newDecoder(); + + private boolean hasFinished; + + public AbstractSocket(String hostname, int port, MapiConnection connection) throws IOException { + this.socket = new Socket(hostname, port); + this.connection = connection; + this.bufferIn = ByteBuffer.wrap(new byte[getBlockSize()]); + this.bufferOut = ByteBuffer.wrap(new byte[getBlockSize()]); + this.stringsEncoded = CharBuffer.allocate(getBlockSize()); + this.stringsDecoded = CharBuffer.allocate(getBlockSize()); + this.stringsDecoded.flip(); + } + + public int getSoTimeout() throws SocketException { + return socket.getSoTimeout(); + } + + public void setSoTimeout(int s) throws SocketException { + socket.setSoTimeout(s); + } + + public void setTcpNoDelay(boolean on) throws SocketException { + socket.setTcpNoDelay(on); + } + + public void setSocketChannelEndianness(ByteOrder bo) { + this.bufferIn.order(bo); + this.bufferOut.order(bo); + } + + public abstract int getBlockSize(); + + abstract int readToBufferIn(ByteBuffer bufferIn) throws IOException; + + abstract int writeFromBufferOut(ByteBuffer bufferOut) throws IOException; + + abstract void flush() throws IOException; + + private void readToBuffer() throws IOException { + int read = this.readToBufferIn(this.bufferIn); + if(read == 0) { + this.hasFinished = true; + throw new IOException("Done!"); + } + this.stringsDecoded.clear(); + this.asciiDecoder.reset(); + this.asciiDecoder.decode(this.bufferIn, this.stringsDecoded,true); + this.asciiDecoder.flush(this.stringsDecoded); + this.stringsDecoded.flip(); + } + + public int readLine(StringBuilder builder) throws IOException { + builder.setLength(0); + boolean found = false; + + while(!found) { + if(!this.stringsDecoded.hasRemaining()) { + this.readToBuffer(); + } + char c = this.stringsDecoded.get(); + if(c == '\n') { + found = true; + } else { + builder.append(c); + } + } + return builder.length(); + } + + private void flushOutputCharBuffer() throws IOException { + this.stringsEncoded.flip(); + this.asciiEncoder.reset(); + this.asciiEncoder.encode(this.stringsEncoded, this.bufferOut, true); + this.asciiEncoder.flush(this.bufferOut); + this.stringsEncoded.clear(); + int written = this.writeFromBufferOut(this.bufferOut); + if(written == 0) { + this.hasFinished = true; + throw new IOException("Done!"); + } else { + this.flush(); + } + } + + private void writeNextBlock(String line) throws IOException { + int limit = line.length(); + for (int i = 0; i < limit; i++) { + if (!this.stringsEncoded.hasRemaining()) { + this.flushOutputCharBuffer(); + } + this.stringsEncoded.put(line.charAt(i)); + } + } + + public void writeNextLine(String prefix, String line, String suffix) throws IOException { + if(prefix != null) { + this.writeNextBlock(prefix); + } + this.writeNextBlock(line); + if(suffix != null) { + this.writeNextBlock(suffix); + } + this.writeNextBlock("\n"); + if (this.stringsEncoded.hasRemaining()) { + this.flushOutputCharBuffer(); + } + } +}
rename from src/main/java/nl/cwi/monetdb/mcl/connection/MapiConnection.java rename to src/main/java/nl/cwi/monetdb/mcl/connection/socket/MapiConnection.java --- a/src/main/java/nl/cwi/monetdb/mcl/connection/MapiConnection.java +++ b/src/main/java/nl/cwi/monetdb/mcl/connection/socket/MapiConnection.java @@ -1,83 +1,45 @@ -package nl.cwi.monetdb.mcl.connection; +package nl.cwi.monetdb.mcl.connection.socket; import nl.cwi.monetdb.jdbc.MonetConnection; -import nl.cwi.monetdb.mcl.io.SocketConnection; +import nl.cwi.monetdb.mcl.connection.ChannelSecurity; +import nl.cwi.monetdb.mcl.connection.ControlCommands; +import nl.cwi.monetdb.mcl.connection.MCLException; import nl.cwi.monetdb.mcl.protocol.ProtocolException; import nl.cwi.monetdb.mcl.protocol.AbstractProtocol; import nl.cwi.monetdb.mcl.protocol.ServerResponses; import nl.cwi.monetdb.mcl.protocol.oldmapi.OldMapiProtocol; -import nl.cwi.monetdb.util.ChannelSecurity; import java.io.IOException; import java.net.SocketException; +import java.net.SocketTimeoutException; import java.net.URI; import java.net.URISyntaxException; +import java.nio.ByteOrder; +import java.sql.SQLException; import java.util.*; -/** - * A Socket for communicating with the MonetDB database in MAPI block - * mode. - * - * The MapiSocket implements the protocol specifics of the MAPI block - * mode protocol, and interfaces it as a socket that delivers a - * BufferedReader and a BufferedWriter. Because logging in is an - * integral part of the MAPI protocol, the MapiSocket performs the login - * procedure. Like the Socket class, various options can be set before - * calling the connect() method to influence the login process. Only - * after a successful call to connect() the BufferedReader and - * BufferedWriter can be retrieved. - * <br /> - * For each line read, it is determined what type of line it is - * according to the MonetDB MAPI protocol. This results in a line to be - * PROMPT, HEADER, RESULT, ERROR or UNKNOWN. Use the getLineType() - * method on the BufferedMCLReader to retrieve the type of the last - * line read. - * - * For debugging purposes a socket level debugging is implemented where - * each and every interaction to and from the MonetDB server is logged - * to a file on disk.<br /> - * Incoming messages are prefixed by "RX" (received by the driver), - * outgoing messages by "TX" (transmitted by the driver). Special - * decoded non-human readable messages are prefixed with "RD" and "TD" - * instead. Following this two char prefix, a timestamp follows as the - * number of milliseconds since the UNIX epoch. The rest of the line is - * a String representation of the data sent or received. - * - * The general use of this Socket must be seen only in the full context - * of a MAPI connection to a server. It has the same ingredients as a - * normal Socket, allowing for seamless plugging. - * <pre> - * Socket \ / InputStream ----> (BufferedMCL)Reader - * > o < - * MapiSocket / \ OutputStream ----> (BufferedMCL)Writer - * </pre> - * The MapiSocket allows to retrieve Streams for communicating. They - * are interfaced, so they can be chained in any way. While the Socket - * transparently deals with how data is sent over the wire, the actual - * data read needs to be interpreted, for which a Reader/Writer - * interface is most sufficient. In particular the BufferedMCL* - * implementations of those interfaces supply some extra functionality - * geared towards the format of the data. - * - * @author Fabian Groffen - * @version 4.1 - */ public class MapiConnection extends MonetConnection { + public static final char PROMPT_CHAR = '.'; + /** The hostname to connect to */ private final String hostname; /** The port to connect on the host to */ private final int port; + /** The TCP Socket timeout in milliseconds. Default is 0 meaning the timeout is disabled (i.e., timeout of infinity) */ + private int soTimeout = 0; /** Whether we should follow redirects */ private boolean followRedirects = true; /** How many redirections do we follow until we're fed up with it? */ private int ttl = 10; - /** protocol version of the connection */ + /** Protocol version of the connection */ private int version; + /** Endianness of the server */ + private ByteOrder serverEndianness; - MapiConnection(Properties props, String database, String hash, String language, boolean blobIsBinary, - boolean isDebugging, String hostname, int port) throws IOException { - super(props, database, hash, language, blobIsBinary, isDebugging); + public MapiConnection(Properties props, String database, String hash, String language, boolean blobIsBinary, + boolean isDebugging, String hostname, int port) throws IOException { + super(props, database, hash, MapiLanguage.GetLanguageFromString(language), blobIsBinary, isDebugging); this.hostname = hostname; this.port = port; } @@ -98,10 +60,6 @@ public class MapiConnection extends Mone return ttl; } - public int setProtocolVersion() { - return version; - } - /** * Sets whether MCL redirections should be followed or not. If set * to false, an MCLException will be thrown when a redirect is @@ -140,25 +98,54 @@ public class MapiConnection extends Mone return this.version; } - @Override - public int getBlockSize() { - return ((OldMapiProtocol)protocol).getConnection().getBlockSize(); + public int getVersion() { + return version; + } + + public ByteOrder getServerEndianness() { + return serverEndianness; } @Override + public int getBlockSize() { + return ((OldMapiProtocol)protocol).getSocket().getBlockSize(); + } + + /** + * Gets the SO_TIMEOUT from the underlying Socket. + * + * @return the currently in use timeout in milliseconds + */ + @Override public int getSoTimeout() { try { - return ((OldMapiProtocol)protocol).getConnection().getSoTimeout(); + if(protocol != null) { + this.soTimeout = ((OldMapiProtocol)protocol).getSocket().getSoTimeout(); + } + return this.soTimeout; } catch (SocketException e) { this.addWarning("The socket timeout could not be get", "M1M05"); } return -1; } + /** + * Set the SO_TIMEOUT on the underlying Socket. When for some reason the connection to the database hangs, this + * setting can be useful to break out of this indefinite wait. This option must be enabled prior to entering the + * blocking operation to have effect. + * + * @param s The specified timeout, in milliseconds. A timeout of zero is interpreted as an infinite timeout. + */ @Override public void setSoTimeout(int s) { + if (s < 0) { + throw new IllegalArgumentException("Timeout can't be negative"); + } try { - ((OldMapiProtocol)protocol).getConnection().setSoTimeout(s); + if(protocol != null) { + ((OldMapiProtocol)protocol).getSocket().setSoTimeout(s); + } + this.soTimeout = s; } catch (SocketException e) { this.addWarning("The socket timeout could not be set", "M1M05"); } @@ -166,15 +153,15 @@ public class MapiConnection extends Mone @Override public void closeUnderlyingConnection() throws IOException { - ((OldMapiProtocol)protocol).getConnection().close(); + ((OldMapiProtocol)protocol).getSocket().close(); } @Override public String getJDBCURL() { - String language = ""; - if (this.getLanguage() == MonetDBLanguage.LANG_MAL) - language = "?language=mal"; - return "jdbc:monetdb://" + this.hostname + ":" + this.port + "/" + this.database + language; + String res = "jdbc:monetdb://" + this.hostname + ":" + this.port + "/" + this.database; + if (this.getLanguage() == MapiLanguage.LANG_MAL) + res += "?language=mal"; + return res; } @Override @@ -182,13 +169,53 @@ public class MapiConnection extends Mone return this.protocol; } + /** + * Sends the given string to MonetDB as control statement, making + * sure there is a prompt after the command is sent. All possible + * returned information is discarded. Encountered errors are + * reported. + * + * @param con the command to send + * @throws SQLException if an IO exception or a database error occurs + */ + @Override + public void sendControlCommand(ControlCommands con, int data) throws SQLException { + String command = null; + switch (con) { + case AUTO_COMMIT: + command = "auto_commit " + ((data == 1) ? "1" : "0"); + break; + case REPLY_SIZE: + command = "reply_size " + data; + break; + case RELEASE: + command = "release " + data; + break; + case CLOSE: + command = "close " + data; + } + synchronized (protocol) { + try { + protocol.writeNextQuery(language.getCommandTemplateIndex(0), command, language.getCommandTemplateIndex(1)); + protocol.waitUntilPrompt(); + if (protocol.getCurrentServerResponseHeader() == ServerResponses.ERROR) { + String error = protocol.getRemainingStringLine(0); + throw new SQLException(error.substring(6), error.substring(0, 5)); + } + } catch (SocketTimeoutException e) { + close(); // JDBC 4.1 semantics, abort() + throw new SQLException("connection timed out", "08M33"); + } catch (IOException e) { + throw new SQLException(e.getMessage(), "08000"); + } + } + } + @Override public List<String> connect(String user, String pass) throws IOException, ProtocolException, MCLException { - // Wrap around the internal connect that needs to know if it - // should really make a TCP connection or not. + // Wrap around the internal connect that needs to know if it should really make a TCP connection or not. List<String> res = connect(this.hostname, this.port, user, pass, true); - // apply NetworkTimeout value from legacy (pre 4.1) driver - // so_timeout calls + // apply NetworkTimeout value from legacy (pre 4.1) driver so_timeout calls this.setSoTimeout(this.getSoTimeout()); return res; } @@ -198,27 +225,19 @@ public class MapiConnection extends Mone if (ttl-- <= 0) throw new MCLException("Maximum number of redirects reached, aborting connection attempt. Sorry."); - AbstractProtocol<?> pro; - if (makeConnection) { - pro = new OldMapiProtocol(new SocketConnection(this.hostname, this.port)); - this.protocol = pro; - ((OldMapiProtocol)pro).getConnection().setTcpNoDelay(true); - - // set nodelay, as it greatly speeds up small messages (like we - // often do) - //TODO writer.registerReader(reader); ?? - } else { - pro = this.protocol; + this.protocol = new OldMapiProtocol(new OldMapiSocket(this.hostname, this.port, this)); + //set nodelay, as it greatly speeds up small messages (like we often do) + ((OldMapiProtocol)this.protocol).getSocket().setTcpNoDelay(true); + ((OldMapiProtocol)this.protocol).getSocket().setSoTimeout(this.soTimeout); } - pro.fetchNextResponseData(); - pro.waitUntilPrompt(); - String firstLine = pro.getRemainingStringLine(0); - - String test = this.getChallengeResponse(firstLine, user, pass, this.language.getRepresentation(), + this.protocol.fetchNextResponseData(); + String nextLine = this.protocol.getCurrentData().toString(); + this.protocol.waitUntilPrompt(); + String test = this.getChallengeResponse(nextLine, user, pass, this.language.getRepresentation(), this.database, this.hash); - pro.writeNextCommand(MonetDBLanguage.EmptyString, test.getBytes(), MonetDBLanguage.EmptyString); + this.protocol.writeNextQuery("", test, ""); List<String> redirects = new ArrayList<>(); List<String> warns = new ArrayList<>(); @@ -226,16 +245,17 @@ public class MapiConnection extends Mone ServerResponses next; do { - pro.fetchNextResponseData(); - next = pro.getCurrentServerResponseHeader(); + this.protocol.fetchNextResponseData(); + next = this.protocol.getCurrentServerResponseHeader(); switch (next) { case ERROR: - err += "\n" + pro.getRemainingStringLine(7); + err += "\n" + this.protocol.getRemainingStringLine(7); break; case INFO: - warns.add(pro.getRemainingStringLine(1)); + warns.add(this.protocol.getRemainingStringLine(1)); + break; case REDIRECT: - redirects.add(pro.getRemainingStringLine(1)); + redirects.add(this.protocol.getRemainingStringLine(1)); } } while (next != ServerResponses.PROMPT); @@ -245,15 +265,10 @@ public class MapiConnection extends Mone } if (!redirects.isEmpty()) { if (followRedirects) { - // Ok, server wants us to go somewhere else. The list - // might have multiple clues on where to go. For now we - // don't support anything intelligent but trying the - // first one. URI should be in form of: - // "mapi:monetdb://host:port/database?arg=value&..." - // or - // "mapi:merovingian://proxy?arg=value&..." - // note that the extra arguments must be obeyed in both - // cases + // Ok, server wants us to go somewhere else. The list might have multiple clues on where to go. For now + // we don't support anything intelligent but trying the first one. URI should be in form of: + // "mapi:monetdb://host:port/database?arg=value&..." or "mapi:merovingian://proxy?arg=value&..." note + // that the extra arguments must be obeyed in both cases String suri = redirects.get(0); if (!suri.startsWith("mapi:")) throw new MCLException("unsupported redirect: " + suri); @@ -264,7 +279,6 @@ public class MapiConnection extends Mone } catch (URISyntaxException e) { throw new ProtocolException(e.toString()); } - String tmp = u.getQuery(); if (tmp != null) { String args[] = tmp.split("&"); @@ -283,7 +297,7 @@ public class MapiConnection extends Mone case "language": tmp = arg.substring(pos + 1); warns.add("redirect specifies use of different language: " + tmp); - this.language = MonetDBLanguage.GetLanguageFromString(tmp); + this.language = MapiLanguage.GetLanguageFromString(tmp); break; case "user": tmp = arg.substring(pos + 1); @@ -307,9 +321,8 @@ public class MapiConnection extends Mone switch (u.getScheme()) { case "monetdb": - // this is a redirect to another (monetdb) server, - // which means a full reconnect - // avoid the debug log being closed + // this is a redirect to another (monetdb) server, which means a full reconnect avoid the debug + // log being closed if (this.isDebugging) { this.isDebugging = false; this.close(); @@ -330,8 +343,7 @@ public class MapiConnection extends Mone warns.add("Redirect by " + host + ":" + port + " to " + suri); break; case "merovingian": - // reuse this connection to inline connect to the - // right database that Merovingian proxies for us + // reuse this connection to inline connect to the right database that Merovingian proxies for us warns.addAll(connect(host, port, user, pass, false)); break; default: @@ -349,17 +361,15 @@ public class MapiConnection extends Mone } /** - * A little helper function that processes a challenge string, and - * returns a response string for the server. If the challenge - * string is null, a challengeless response is returned. + * A little helper function that processes a challenge string, and returns a response string for the server. + * If the challenge string is null, a challengeless response is returned. * * @param chalstr the challenge string * @param username the username to use * @param password the password to use * @param language the language to use * @param database the database to connect to - * @param hash the hash method(s) to use, or NULL for all supported - * hashes + * @param hash the hash method(s) to use, or NULL for all supported hashes */ private String getChallengeResponse(String chalstr, String username, String password, String language, String database, String hash) @@ -370,27 +380,35 @@ public class MapiConnection extends Mone // parse the challenge string, split it on ':' String[] chaltok = chalstr.split(":"); if (chaltok.length <= 4) - throw new ProtocolException("Server challenge string unusable! Challenge contains too few tokens: " + throw new ProtocolException("Server challenge string unusable! Challenge contains too few tokens: " + chalstr); // challenge string to use as salt/key String challenge = chaltok[0]; String servert = chaltok[1]; try { - version = Integer.parseInt(chaltok[2].trim()); // protocol version + this.version = Integer.parseInt(chaltok[2].trim()); // protocol version } catch (NumberFormatException e) { - throw new ProtocolException("Protocol version unparseable: " + chaltok[3]); + throw new ProtocolException("Protocol version unparseable: " + chaltok[2]); } + switch (chaltok[4]) { + case "BIG": + this.serverEndianness = ByteOrder.BIG_ENDIAN; + break; + case "LIT": + this.serverEndianness = ByteOrder.LITTLE_ENDIAN; + break; + default: + throw new ProtocolException("Invalid byte-order: " + chaltok[4]); + } + ((OldMapiProtocol)protocol).getSocket().setSocketChannelEndianness(this.serverEndianness); + // handle the challenge according to the version it is - switch (version) { - default: - throw new MCLException("Unsupported protocol version: " + version); + switch (this.version) { case 9: - // proto 9 is like 8, but uses a hash instead of the - // plain password, the server tells us which hash in the - // challenge after the byte-order - + // proto 9 is like 8, but uses a hash instead of the plain password, the server tells us which hash in + // the challenge after the byte-order /* NOTE: Java doesn't support RIPEMD160 :( */ switch (chaltok[5]) { case "SHA512": @@ -401,7 +419,7 @@ public class MapiConnection extends Mone break; case "SHA256": algo = "SHA-256"; - /* NOTE: Java doesn't support SHA-224 */ + /* NOTE: Java supports SHA-224 only on 8 */ break; case "SHA1": algo = "SHA-1"; @@ -413,18 +431,13 @@ public class MapiConnection extends Mone throw new MCLException("Unsupported password hash: " + chaltok[5]); } - password = ChannelSecurity.DigestStrings(algo, password); + password = ChannelSecurity.DigestStrings(algo, password.getBytes("UTF-8")); - // proto 7 (finally) used the challenge and works with a - // password hash. The supported implementations come - // from the server challenge. We chose the best hash - // we can find, in the order SHA1, MD5, plain. Also, - // the byte-order is reported in the challenge string, - // which makes sense, since only blockmode is supported. - // proto 8 made this obsolete, but retained the - // byte-order report for future "binary" transports. In - // proto 8, the byte-order of the blocks is always little - // endian because most machines today are. + // proto 7 (finally) used the challenge and works with a password hash. The supported implementations + // come from the server challenge. We chose the best hash we can find, in the order SHA1, MD5, plain. + // Also, the byte-order is reported in the challenge string. proto 8 made this obsolete, but retained + // the byte-order report for future "binary" transports. In proto 8, the byte-order of the blocks is + // always little endian because most machines today are. String hashes = (hash == null ? chaltok[3] : hash); Set<String> hashesSet = new HashSet<>(Arrays.asList(hashes.toUpperCase().split("[, ]"))); @@ -454,20 +467,8 @@ public class MapiConnection extends Mone throw new MCLException("No supported password hashes in " + hashes); } - pwhash += ChannelSecurity.DigestStrings(algo, password, challenge); - - // TODO: some day when we need this, we should store - // this - switch (chaltok[4]) { - case "BIG": - // byte-order of server is big-endian - break; - case "LIT": - // byte-order of server is little-endian - break; - default: - throw new ProtocolException("Invalid byte-order: " + chaltok[5]); - } + pwhash += ChannelSecurity.DigestStrings(algo, password.getBytes("UTF-8"), + challenge.getBytes("UTF-8")); // generate response response = "BIG:"; // JVM byte-order is big-endian @@ -479,6 +480,8 @@ public class MapiConnection extends Mone this.conn_props.setProperty("database", database); return response; + default: + throw new MCLException("Unsupported protocol version: " + version); } } }
rename from src/main/java/nl/cwi/monetdb/mcl/connection/MonetDBLanguage.java rename to src/main/java/nl/cwi/monetdb/mcl/connection/socket/MapiLanguage.java --- a/src/main/java/nl/cwi/monetdb/mcl/connection/MonetDBLanguage.java +++ b/src/main/java/nl/cwi/monetdb/mcl/connection/socket/MapiLanguage.java @@ -1,53 +1,57 @@ -package nl.cwi.monetdb.mcl.connection; +package nl.cwi.monetdb.mcl.connection.socket; + +import nl.cwi.monetdb.mcl.connection.IMonetDBLanguage; /** * Created by ferreira on 11/30/16. */ -public enum MonetDBLanguage { +public enum MapiLanguage implements IMonetDBLanguage { /** the SQL language */ - LANG_SQL(new byte[][]{"s".getBytes(), "\n;".getBytes(), "\n;\n".getBytes()}, new byte[][]{"X".getBytes(), null, - "\nX".getBytes()}, "sql"), + LANG_SQL(new String[]{"s", "\n;", "\n;\n"}, new String[]{"X", null, "\nX"}, "sql"), /** the MAL language (officially *NOT* supported) */ - LANG_MAL(new byte[][]{null, ";\n".getBytes(), ";\n".getBytes()}, new byte[][]{null, null, null}, "mal"), + LANG_MAL(new String[]{null, ";\n", ";\n"}, new String[]{null, null, null}, "mal"), /** an unknown language */ LANG_UNKNOWN(null, null, "unknown"); - MonetDBLanguage(byte[][] queryTemplates, byte[][] commandTemplates, String representation) { + MapiLanguage(String[] queryTemplates, String[] commandTemplates, String representation) { this.queryTemplates = queryTemplates; this.commandTemplates = commandTemplates; this.representation = representation; } - private final byte[][] queryTemplates; + private final String[] queryTemplates; - private final byte[][] commandTemplates; + private final String[] commandTemplates; private final String representation; - public byte[] getQueryTemplateIndex(int index) { + @Override + public String getQueryTemplateIndex(int index) { return queryTemplates[index]; } - public byte[] getCommandTemplateIndex(int index) { + @Override + public String getCommandTemplateIndex(int index) { return commandTemplates[index]; } - public byte[][] getQueryTemplates() { + @Override + public String[] getQueryTemplates() { return queryTemplates; } - public byte[][] getCommandTemplates() { + @Override + public String[] getCommandTemplates() { return commandTemplates; } + @Override public String getRepresentation() { return representation; } - public static final byte[] EmptyString = "".getBytes(); - - public static MonetDBLanguage GetLanguageFromString(String language) { + public static MapiLanguage GetLanguageFromString(String language) { switch (language) { case "sql": return LANG_SQL;
new file mode 100644 --- /dev/null +++ b/src/main/java/nl/cwi/monetdb/mcl/connection/socket/OldMapiSocket.java @@ -0,0 +1,320 @@ +package nl.cwi.monetdb.mcl.connection.socket; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +/** + * Created by ferreira on 12/9/16. + */ +public class OldMapiSocket extends AbstractSocket { + + /** The blocksize (hardcoded in compliance with stream.mx) */ + private final static int BLOCK = 8 * 1024 - 2; + + /** + * A short in two bytes for holding the block size in bytes + */ + private final byte[] blklen = new byte[2]; + + private final OldMapiBlockInputStream inStream; + + private final OldMapiBlockOutputStream outStream; + + OldMapiSocket(String hostname, int port, MapiConnection connection) throws IOException { + super(hostname, port, connection); + this.inStream = new OldMapiBlockInputStream(socket.getInputStream()); + this.outStream = new OldMapiBlockOutputStream(socket.getOutputStream()); + } + + @Override + public int getBlockSize() { + return BLOCK; + } + + @Override + int readToBufferIn(ByteBuffer bufferIn) throws IOException { + return this.inStream.read(bufferIn); + } + + @Override + int writeFromBufferOut(ByteBuffer bufferOut) throws IOException { + return this.outStream.write(bufferOut); + } + + @Override + void flush() throws IOException { + this.outStream.flush(); + } + + @Override + public void close() throws IOException { + this.socket.close(); + } + + private class OldMapiBlockInputStream { + + private final InputStream inStream; + + private int readPos = 0; + + private int blockLen = 0; + + private final byte[] block = new byte[BLOCK]; + + /** + * Constructs this BlockInputStream, backed by the given InputStream. A BufferedInputStream is internally used. + */ + OldMapiBlockInputStream(InputStream in) { + this.inStream = in; + } + + public int available() { + return blockLen - readPos; + } + + /** + * Small wrapper to get a blocking variant of the read() method + * on the BufferedInputStream. We want to benefit from the + * Buffered pre-fetching, but not dealing with half blocks. + * Changing this class to be able to use the partially received + * data will greatly complicate matters, while a performance + * improvement is debatable given the relatively small size of + * our blocks. Maybe it does speed up on slower links, then + * consider this method a quick bug fix/workaround. + * + * @return false if reading the block failed due to EOF + */ + private boolean _read(byte[] b, int len) throws IOException { + int s; + int off = 0; + + while (len > 0) { + s = inStream.read(b, off, len); + if (s == -1) { + // if we have read something before, we should have been + // able to read the whole, so make this fatal + if (off > 0) { + throw new IOException("Read from " + connection.getHostname() + ":" + + connection.getPort() + ": Incomplete block read from stream"); + } + return false; + } + len -= s; + off += s; + } + return true; + } + + /** + * Reads the next block on the stream into the internal buffer, + * or writes the prompt in the buffer. + * <p> + * The blocked stream protocol consists of first a two byte + * integer indicating the length of the block, then the + * block, followed by another length + block. The end of + * such sequence is put in the last bit of the length, and + * hence this length should be shifted to the right to + * obtain the real length value first. We simply fetch + * blocks here as soon as they are needed for the stream's + * read methods. + * <p> + * The user-flush, which is an implicit effect of the end of + * a block sequence, is communicated beyond the stream by + * inserting a prompt sequence on the stream after the last + * block. This method makes sure that a final block ends with a + * newline, if it doesn't already, in order to facilitate a + * Reader that is possibly chained to this InputStream. + * <p> + * If the stream is not positioned correctly, hell will break + * loose. + */ + private int readBlock() throws IOException { + // read next two bytes (short) + if (!_read(blklen, 2)) + return -1; + + // Get the short-value and store its value in blockLen. + blockLen = (short) ((blklen[0] & 0xFF) >> 1 | (blklen[1] & 0xFF) << 7); + readPos = 0; + + // sanity check to avoid bad servers make us do an ugly stack trace + if (blockLen > block.length) + throw new AssertionError("Server sent a block larger than BLOCKsize: " + + blockLen + " > " + block.length); + if (!_read(block, blockLen)) + return -1; + + // if this is the last block, make it end with a newline and prompt + if ((blklen[0] & 0x1) == 1) { + if (blockLen > 0 && block[blockLen - 1] != '\n') { + // to terminate the block in a Reader + block[blockLen++] = '\n'; + } + // insert 'fake' flush + block[blockLen++] = MapiConnection.PROMPT_CHAR; + block[blockLen++] = '\n'; + } + + return blockLen; + } + + public int read() throws IOException { + if (available() == 0) { + if (readBlock() == -1) + return -1; + } + return (int) block[readPos++]; + } + + public int read(ByteBuffer b) throws IOException { + return read(b, 0, b.capacity()); + } + + public int read(ByteBuffer b, int off, int len) throws IOException { + b.clear(); + int t; + int size = 0; + while (size < len) { + t = available(); + if (t == 0) { + if (size != 0) + break; + if (readBlock() == -1) { + size = -1; + break; + } + t = available(); + } + if (len > t) { + System.arraycopy(block, readPos, b.array(), off, t); + off += t; + len -= t; + readPos += t; + size += t; + } else { + System.arraycopy(block, readPos, b.array(), off, len); + readPos += len; + size += len; + break; + } + } + b.position(size); + b.flip(); + return size; + } + + public long skip(long n) throws IOException { + long skip = n; + int t = 0; + while (skip > 0) { + t = available(); + if (skip > t) { + skip -= t; + readPos += t; + readBlock(); + } else { + readPos += skip; + break; + } + } + return n; + } + } + + class OldMapiBlockOutputStream { + + private final OutputStream outStream; + + private int writePos = 0; + + private byte[] block = new byte[BLOCK]; + + private int blocksize = 0; + + /** + * Constructs this BlockOutputStream, backed by the given OutputStream. A BufferedOutputStream is internally + * used. + */ + OldMapiBlockOutputStream(OutputStream out) { + this.outStream = out; + } + + void flush() throws IOException { + // write the block (as final) then flush. + writeBlock(true); + outStream.flush(); + } + + /** + * writeBlock puts the data in the block on the stream. The + * boolean last controls whether the block is sent with an + * indicator to note it is the last block of a sequence or not. + * + * @param last whether this is the last block + * @throws IOException if writing to the stream failed + */ + void writeBlock(boolean last) throws IOException { + if (last) { + // always fits, because of BLOCK's size + blocksize = (short) writePos; + // this is the last block, so encode least + // significant bit in the first byte (little-endian) + blklen[0] = (byte) (blocksize << 1 & 0xFF | 1); + blklen[1] = (byte) (blocksize >> 7); + } else { + // always fits, because of BLOCK's size + blocksize = (short) BLOCK; + // another block will follow, encode least + // significant bit in the first byte (little-endian) + blklen[0] = (byte) (blocksize << 1 & 0xFF); + blklen[1] = (byte) (blocksize >> 7); + } + outStream.write(blklen); + // write the actual block + outStream.write(block, 0, writePos); + writePos = 0; + } + + void write(int b) throws IOException { + if (writePos == BLOCK) { + writeBlock(false); + } + block[writePos++] = (byte) b; + } + + int write(ByteBuffer b) throws IOException { + return write(b, 0, b.position()); + } + + int write(ByteBuffer b, int off, int len) throws IOException { + int t, written = 0; + b.flip(); + while (len > 0) { + t = BLOCK - writePos; + if (len > t) { + System.arraycopy(b.array(), off, block, writePos, t); + off += t; + len -= t; + writePos += t; + written += t; + writeBlock(false); + } else { + System.arraycopy(b.array(), off, block, writePos, len); + writePos += len; + written += len; + break; + } + } + b.clear(); + return written; + } + + public void close() throws IOException { + // we don't want the flush() method to be called (default of the FilterOutputStream), so we close manually + // here + outStream.close(); + } + } +}
deleted file mode 100644 --- a/src/main/java/nl/cwi/monetdb/mcl/io/SocketConnection.java +++ /dev/null @@ -1,183 +0,0 @@ -package nl.cwi.monetdb.mcl.io; - -import java.io.Closeable; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketException; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; - -/** - * Created by ferreira on 11/29/16. - */ -public class SocketConnection implements Closeable { - - /** The blocksize (hardcoded in compliance with stream.mx) */ - private static final int BLOCK = 8 * 1024 - 2; - - private static final int CHAR_SIZE = Character.SIZE / Byte.SIZE; - - private static final int SHORT_SIZE = Short.SIZE / Byte.SIZE; - - private static final int INTEGER_SIZE = Integer.SIZE / Byte.SIZE; - - private static final int LONG_SIZE = Long.SIZE / Byte.SIZE; - - private static final int FLOAT_SIZE = Float.SIZE / Byte.SIZE; - - private static final int DOUBLE_SIZE = Double.SIZE / Byte.SIZE; - - private static final int INTERMEDIATE_BUFFER_SIZE = 1024; - - /* Local variables */ - private boolean hasFinished; - - /** Bytebuffers */ - private final ByteBuffer bufferIn = ByteBuffer.allocateDirect(INTERMEDIATE_BUFFER_SIZE); - - private final ByteBuffer bufferOut = ByteBuffer.allocateDirect(INTERMEDIATE_BUFFER_SIZE); - - /** The socket channel */ - private final SocketChannel connection; - - public SocketConnection(String hostname, int port) throws IOException { - this.connection = SocketChannel.open(new InetSocketAddress(hostname, port)); - this.connection.configureBlocking(true); - } - - /* Socket Channel methods */ - - public int getSoTimeout() throws SocketException { - return connection.socket().getSoTimeout(); - } - - public void setSoTimeout(int s) throws SocketException { - connection.socket().setSoTimeout(s); - } - - public int getBlockSize() { - return BLOCK; - } - - public int readMore(ByteBuffer dst) throws IOException { - return connection.read(dst); - } - - public int writeMore(ByteBuffer src) throws IOException { - return connection.write(src); - } - - public void setTcpNoDelay(boolean on) throws SocketException { - this.connection.socket().setTcpNoDelay(on); - } - - @Override - public void close() throws IOException { - this.hasFinished = true; - this.connection.close(); - } - - /* Byte buffer methods */ - - private void refillBufferIn() throws IOException { - bufferIn.compact(); - if(!hasFinished) { - try { - connection.read(this.bufferIn); - bufferIn.flip(); - } catch (IOException ex) { - hasFinished = true; - } - } else { - throw new IOException("Done!"); - } - } - - - public byte readNextByte() throws IOException { - if(this.bufferIn.remaining() < Byte.SIZE) { - this.refillBufferIn(); - } - return this.bufferIn.get(); - } - - public char readNextChar() throws IOException { - if(this.bufferIn.remaining() < CHAR_SIZE) { - this.refillBufferIn(); - } - return this.bufferIn.getChar(); - } - - public short readNextShort() throws IOException { - if(this.bufferIn.remaining() < SHORT_SIZE) { - this.refillBufferIn(); - } - return this.bufferIn.getShort(); - } - - public int readNextInt() throws IOException { - if(this.bufferIn.remaining() < INTEGER_SIZE) { - this.refillBufferIn(); - } - return this.bufferIn.getInt(); - } - - public long readNextLong() throws IOException { - if(this.bufferIn.remaining() < LONG_SIZE) { - this.refillBufferIn(); - } - return this.bufferIn.getLong(); - } - - public float readNextFloat() throws IOException { - if(this.bufferIn.remaining() < FLOAT_SIZE) { - this.refillBufferIn(); - } - return this.bufferIn.getFloat(); - } - - public double readNextDouble() throws IOException { - if(this.bufferIn.remaining() < DOUBLE_SIZE) { - this.refillBufferIn(); - } - return this.bufferIn.getDouble(); - } - - public int readUntilChar(StringBuilder builder, char limit) throws IOException { - boolean found = false; - - while(!found) { - if (this.bufferIn.remaining() < CHAR_SIZE) { - this.refillBufferIn(); - } - char next = this.bufferIn.getChar(); - builder.append(next); - if(next == limit) { - found = true; - } - } - return builder.length(); - } - - public void writeNextLine(byte[] prefix, byte[] line, byte[] suffix) throws IOException { - bufferOut.clear(); - this.writeNextBlock(prefix); - this.writeNextBlock(line); - this.writeNextBlock(suffix); - if (bufferOut.hasRemaining()) { - bufferOut.flip(); - connection.write(this.bufferOut); - } - } - - private void writeNextBlock(byte[] block) throws IOException { - for (byte aBlock : block) { - if (!bufferOut.hasRemaining()) { - bufferOut.flip(); - connection.write(this.bufferOut); - bufferOut.clear(); - } - bufferOut.put(aBlock); - } - } -}
--- a/src/main/java/nl/cwi/monetdb/mcl/protocol/AbstractProtocol.java +++ b/src/main/java/nl/cwi/monetdb/mcl/protocol/AbstractProtocol.java @@ -17,14 +17,9 @@ public abstract class AbstractProtocol<T protected ServerResponses currentServerResponseHeader = ServerResponses.UNKNOWN; - public ServerResponses waitUntilPrompt() { - while(this.currentServerResponseHeader != ServerResponses.PROMPT) { - this.fetchNextResponseData(); - } - return this.currentServerResponseHeader; - } + public abstract ServerResponses waitUntilPrompt() throws IOException; - public abstract void fetchNextResponseData(); //UPDATE currentData!!! + public abstract void fetchNextResponseData() throws IOException; //UPDATE currentData!!! public ServerResponses getCurrentServerResponseHeader() { return currentServerResponseHeader; @@ -55,5 +50,5 @@ public abstract class AbstractProtocol<T public abstract String getRemainingStringLine(int startIndex); - public abstract void writeNextCommand(byte[] prefix, byte[] query, byte[] suffix) throws IOException; + public abstract void writeNextQuery(String prefix, String query, String suffix) throws IOException; }
--- a/src/main/java/nl/cwi/monetdb/mcl/protocol/embedded/EmbeddedProtocol.java +++ b/src/main/java/nl/cwi/monetdb/mcl/protocol/embedded/EmbeddedProtocol.java @@ -1,11 +1,8 @@ package nl.cwi.monetdb.mcl.protocol.embedded; import nl.cwi.monetdb.jdbc.MonetConnection; -import nl.cwi.monetdb.mcl.io.JDBCEmbeddedConnection; -import nl.cwi.monetdb.mcl.protocol.ProtocolException; -import nl.cwi.monetdb.mcl.protocol.AbstractProtocol; -import nl.cwi.monetdb.mcl.protocol.StarterHeaders; -import nl.cwi.monetdb.mcl.protocol.TableResultHeaders; +import nl.cwi.monetdb.mcl.connection.embedded.JDBCEmbeddedConnection; +import nl.cwi.monetdb.mcl.protocol.*; import nl.cwi.monetdb.mcl.responses.AutoCommitResponse; import nl.cwi.monetdb.mcl.responses.DataBlockResponse; import nl.cwi.monetdb.mcl.responses.ResultSetResponse; @@ -30,7 +27,12 @@ public class EmbeddedProtocol extends Ab } @Override - public void fetchNextResponseData() { + public ServerResponses waitUntilPrompt() throws IOException { + return null; + } + + @Override + public void fetchNextResponseData() throws IOException { } @@ -80,7 +82,7 @@ public class EmbeddedProtocol extends Ab } @Override - public void writeNextCommand(byte[] prefix, byte[] query, byte[] suffix) throws IOException { + public void writeNextQuery(String prefix, String query, String suffix) throws IOException { } }
--- a/src/main/java/nl/cwi/monetdb/mcl/protocol/newmapi/NewMapiProtocol.java +++ b/src/main/java/nl/cwi/monetdb/mcl/protocol/newmapi/NewMapiProtocol.java @@ -1,11 +1,7 @@ package nl.cwi.monetdb.mcl.protocol.newmapi; import nl.cwi.monetdb.jdbc.MonetConnection; -import nl.cwi.monetdb.mcl.io.SocketConnection; -import nl.cwi.monetdb.mcl.protocol.ProtocolException; -import nl.cwi.monetdb.mcl.protocol.AbstractProtocol; -import nl.cwi.monetdb.mcl.protocol.StarterHeaders; -import nl.cwi.monetdb.mcl.protocol.TableResultHeaders; +import nl.cwi.monetdb.mcl.protocol.*; import nl.cwi.monetdb.mcl.responses.AutoCommitResponse; import nl.cwi.monetdb.mcl.responses.DataBlockResponse; import nl.cwi.monetdb.mcl.responses.ResultSetResponse; @@ -19,14 +15,14 @@ import java.util.Map; */ public class NewMapiProtocol extends AbstractProtocol<Object[]> { - private final SocketConnection connection; - - public NewMapiProtocol(SocketConnection con) { - this.connection = con; + @Override + public ServerResponses waitUntilPrompt() throws IOException { + return null; } @Override - public void fetchNextResponseData() { + public void fetchNextResponseData() throws IOException { + } @Override @@ -75,7 +71,7 @@ public class NewMapiProtocol extends Abs } @Override - public void writeNextCommand(byte[] prefix, byte[] query, byte[] suffix) throws IOException { + public void writeNextQuery(String prefix, String query, String suffix) throws IOException { } }
--- a/src/main/java/nl/cwi/monetdb/mcl/protocol/oldmapi/OldMapiProtocol.java +++ b/src/main/java/nl/cwi/monetdb/mcl/protocol/oldmapi/OldMapiProtocol.java @@ -1,7 +1,7 @@ package nl.cwi.monetdb.mcl.protocol.oldmapi; import nl.cwi.monetdb.jdbc.MonetConnection; -import nl.cwi.monetdb.mcl.io.SocketConnection; +import nl.cwi.monetdb.mcl.connection.socket.OldMapiSocket; import nl.cwi.monetdb.mcl.protocol.ProtocolException; import nl.cwi.monetdb.mcl.protocol.AbstractProtocol; import nl.cwi.monetdb.mcl.protocol.ServerResponses; @@ -22,7 +22,7 @@ public class OldMapiProtocol extends Abs private static final int STRING_BUILDER_INITIAL_SIZE = 128; - private final SocketConnection connection; + private final OldMapiSocket socket; final StringBuilder builder; @@ -30,14 +30,14 @@ public class OldMapiProtocol extends Abs private final StringBuilder tupleLineBuilder; - public OldMapiProtocol(SocketConnection con) { - this.connection = con; + public OldMapiProtocol(OldMapiSocket socket) { + this.socket = socket; this.builder = new StringBuilder(STRING_BUILDER_INITIAL_SIZE); this.tupleLineBuilder = new StringBuilder(STRING_BUILDER_INITIAL_SIZE); } - public SocketConnection getConnection() { - return connection; + public OldMapiSocket getSocket() { + return socket; } boolean hasRemaining() { @@ -45,28 +45,30 @@ public class OldMapiProtocol extends Abs } @Override - public ServerResponses waitUntilPrompt() { - this.builder.setLength(0); - this.currentPointer = 0; - return super.waitUntilPrompt(); + public ServerResponses waitUntilPrompt() throws IOException { + while(this.currentServerResponseHeader != ServerResponses.PROMPT) { + if(this.socket.readLine(this.builder) == 0) { + throw new IOException("Connection to server lost!"); + } + this.currentPointer = 0; + this.currentServerResponseHeader = OldMapiServerResponseParser.ParseOldMapiServerResponse(this); + if (this.currentServerResponseHeader == ServerResponses.ERROR) { + this.currentPointer = 1; + } + } + return this.currentServerResponseHeader; } @Override - public void fetchNextResponseData() { - ServerResponses res; - try { - int bytesRead = connection.readUntilChar(this.builder, '\n'); - res = OldMapiServerResponseParser.ParseOldMapiServerResponse(this); - if(res == ServerResponses.ERROR && !this.builder.substring(bytesRead).matches("^![0-9A-Z]{5}!.+")) { - this.builder.insert(bytesRead, "!22000!"); - } - } catch (IOException e) { - res = ServerResponses.ERROR; - this.builder.setLength(0); - this.currentPointer = 0; - this.builder.append("!22000!").append(e.getMessage()); + public void fetchNextResponseData() throws IOException { //readLine equivalent + this.socket.readLine(this.builder); + this.currentPointer = 0; + this.currentServerResponseHeader = OldMapiServerResponseParser.ParseOldMapiServerResponse(this); + if (this.currentServerResponseHeader == ServerResponses.ERROR && !this.builder.toString().matches("^![0-9A-Z]{5}!.+")) { + //this.builder.deleteCharAt(0); + this.builder.insert(0, "!22000!"); } - this.currentServerResponseHeader = res; + this.currentPointer = 1; } @Override @@ -76,7 +78,9 @@ public class OldMapiProtocol extends Abs @Override public StarterHeaders getNextStarterHeader() { - return OldMapiStartOfHeaderParser.GetNextStartHeaderOnOldMapi(this); + StarterHeaders res = OldMapiStartOfHeaderParser.GetNextStartHeaderOnOldMapi(this); + this.currentPointer += 2; + return res; } @Override @@ -135,7 +139,8 @@ public class OldMapiProtocol extends Abs } @Override - public void writeNextCommand(byte[] prefix, byte[] query, byte[] suffix) throws IOException { - this.connection.writeNextLine(prefix, query, suffix); + public void writeNextQuery(String prefix, String query, String suffix) throws IOException { + this.socket.writeNextLine(prefix, query, suffix); + this.currentServerResponseHeader = ServerResponses.UNKNOWN; //reset reader state } }
--- a/src/main/java/nl/cwi/monetdb/mcl/protocol/oldmapi/OldMapiStartOfHeaderParser.java +++ b/src/main/java/nl/cwi/monetdb/mcl/protocol/oldmapi/OldMapiStartOfHeaderParser.java @@ -39,12 +39,12 @@ final class OldMapiStartOfHeaderParser { } static int GetNextResponseDataAsInt(OldMapiProtocol protocol) throws ProtocolException { - protocol.currentPointer++; if (!protocol.hasRemaining()) { throw new ProtocolException("unexpected end of string", protocol.currentPointer - 1); } int tmp; char chr = protocol.builder.charAt(protocol.currentPointer); + protocol.currentPointer++; // note: don't use Character.isDigit() here, because // we only want ISO-LATIN-1 digits if (chr >= '0' && chr <= '9') { @@ -70,7 +70,6 @@ final class OldMapiStartOfHeaderParser { } static String GetNextResponseDataAsString(OldMapiProtocol protocol) throws ProtocolException { - protocol.currentPointer++; if (!protocol.hasRemaining()) { throw new ProtocolException("unexpected end of string", protocol.currentPointer - 1); }
--- a/src/main/java/nl/cwi/monetdb/mcl/responses/ResultSetResponse.java +++ b/src/main/java/nl/cwi/monetdb/mcl/responses/ResultSetResponse.java @@ -2,6 +2,7 @@ package nl.cwi.monetdb.mcl.responses; import nl.cwi.monetdb.jdbc.MonetConnection; import nl.cwi.monetdb.jdbc.MonetDriver; +import nl.cwi.monetdb.mcl.connection.ControlCommands; import nl.cwi.monetdb.mcl.protocol.AbstractProtocol; import nl.cwi.monetdb.mcl.protocol.ProtocolException; import nl.cwi.monetdb.mcl.protocol.ServerResponses; @@ -369,7 +370,7 @@ public class ResultSetResponse implement // result was larger than the reply size try { if (destroyOnClose) { - con.sendControlCommand("close " + id); + con.sendControlCommand(ControlCommands.CLOSE, id); } } catch (SQLException e) { // probably a connection error...