Mercurial > hg > monetdb-java
view src/main/java/nl/cwi/monetdb/responses/ResponseList.java @ 64:bb0d66ad7dc6 embedded
More done
author | Pedro Ferreira <pedro.ferreira@monetdbsolutions.com> |
---|---|
date | Thu, 01 Dec 2016 16:52:27 +0100 (2016-12-01) |
parents | |
children | e605cdd6373f |
line wrap: on
line source
package nl.cwi.monetdb.responses; import nl.cwi.monetdb.jdbc.MonetConnection; import nl.cwi.monetdb.mcl.io.AbstractMCLReader; import nl.cwi.monetdb.mcl.parser.MCLParseException; import nl.cwi.monetdb.mcl.parser.StartOfHeaderParser; import java.io.IOException; import java.net.SocketTimeoutException; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * A list of Response objects. Responses are added to this list. * Methods of this class are not synchronized. This is left as * responsibility to the caller to prevent concurrent access. */ public class ResponseList { /** the default number of rows that are (attempted to) read at once */ protected final static int DEF_FETCHSIZE = 250; /** The sequence counter */ protected static int SeqCounter = 0; /** The cache size (number of rows in a DataBlockResponse object) */ private final int cachesize; /** The maximum number of results for this query */ private final int maxrows; /** The ResultSet type to produce */ final int rstype; /** The ResultSet concurrency to produce */ final int rsconcur; /** The sequence number of this ResponseList */ private final int seqnr; /** A list of the Responses associated with the query, * in the right order */ private List<IResponse> responses; /** A map of ResultSetResponses, used for additional * DataBlockResponse mapping */ private Map<Integer, ResultSetResponse> rsresponses; /** The current header returned by getNextResponse() */ private int curResponse; /** * Main constructor. The query argument can either be a String * or List. An SQLException is thrown if another object * instance is supplied. * * @param cachesize overall cachesize to use * @param maxrows maximum number of rows to allow in the set * @param rstype the type of result sets to produce * @param rsconcur the concurrency of result sets to produce */ public ResponseList(int cachesize, int maxrows, int rstype, int rsconcur) throws SQLException { this.cachesize = cachesize; this.maxrows = maxrows; this.rstype = rstype; this.rsconcur = rsconcur; responses = new ArrayList<>(); curResponse = -1; seqnr = SeqCounter++; } /** * Retrieves the next available response, or null if there are * no more responses. * * @return the next Response available or null */ public IResponse getNextResponse() throws SQLException { if (rstype == ResultSet.TYPE_FORWARD_ONLY) { // free resources if we're running forward only if (curResponse >= 0 && curResponse < responses.size()) { IResponse tmp = responses.get(curResponse); if (tmp != null) tmp.close(); responses.set(curResponse, null); } } curResponse++; if (curResponse >= responses.size()) { // ResponseList is obviously completed so, there are no // more responses return null; } else { // return this response return responses.get(curResponse); } } /** * Closes the Response at index i, if not null. * * @param i the index position of the header to close */ public void closeResponse(int i) { if (i < 0 || i >= responses.size()) return; IResponse tmp = responses.set(i, null); if (tmp != null) tmp.close(); } /** * Closes the current response. */ void closeCurrentResponse() { closeResponse(curResponse); } /** * Closes the current and previous responses. */ void closeCurOldResponses() { for (int i = curResponse; i >= 0; i--) { closeResponse(i); } } /** * Closes this ResponseList by closing all the Responses in this * ResponseList. */ public void close() { for (int i = 0; i < responses.size(); i++) { closeResponse(i); } } /** * Returns whether this ResponseList has still unclosed * Responses. */ boolean hasUnclosedResponses() { for (IResponse r : responses) { if (r != null) return true; } return false; } /** * Executes the query contained in this ResponseList, and * stores the Responses resulting from this query in this * ResponseList. * * @throws SQLException if a database error occurs */ public void processQuery(String query) throws SQLException { executeQuery(server.getQueryHeaderTemplates(), query); } /** * Internal executor of queries. * * @param templ the template to fill in * @param query the query to execute * @throws SQLException if a database error occurs */ @SuppressWarnings("fallthrough") public void executeQuery(String[] templ, String query) throws SQLException { boolean sendThreadInUse = false; String error = null; try { synchronized (server) { // make sure we're ready to send query; read data till we // have the prompt it is possible (and most likely) that we // already have the prompt and do not have to skip any // lines. Ignore errors from previous result sets. in.waitForPrompt(); // {{{ set reply size /** * Change the reply size of the server. If the given * value is the same as the current value known to use, * then ignore this call. If it is set to 0 we get a * prompt after the server sent it's header. */ int size = cachesize == 0 ? DEF_FETCHSIZE : cachesize; size = maxrows != 0 ? Math.min(maxrows, size) : size; // don't do work if it's not needed if (server.getLang() == MonetConnection.LANG_SQL && size != curReplySize && templ != server.getCommandHeaderTemplates()) { sendControlCommand("reply_size " + size); // store the reply size after a successful change curReplySize = size; } // }}} set reply size // If the query is larger than the TCP buffer size, use a // special send thread to avoid deadlock with the server due // to blocking behaviour when the buffer is full. Because // the server will be writing back results to us, it will // eventually block as well when its TCP buffer gets full, // as we are blocking an not consuming from it. The result // is a state where both client and server want to write, // but block. if (query.length() > server.getBlockSize()) { // get a reference to the send thread if (sendThread == null) sendThread = new SendThread(out); // tell it to do some work! sendThread.runQuery(templ, query); sendThreadInUse = true; } else { // this is a simple call, which is a lot cheaper and will // always succeed for small queries. out.writeLine((templ[0] == null ? "" : templ[0] + query + templ[1] == null ? "" : templ[1])); } // go for new results String tmpLine = in.readLine(); int linetype = in.getLineType(); IResponse res = null; while (linetype != AbstractMCLReader.PROMPT) { // each response should start with a start of header // (or error) switch (linetype) { case AbstractMCLReader.SOHEADER: // make the response object, and fill it try { switch (sohp.parse(tmpLine)) { case StartOfHeaderParser.Q_PARSE: throw new MCLParseException("Q_PARSE header not allowed here", 1); case StartOfHeaderParser.Q_TABLE: case StartOfHeaderParser.Q_PREPARE: { int id = sohp.getNextAsInt(); int tuplecount = sohp.getNextAsInt(); int columncount = sohp.getNextAsInt(); int rowcount = sohp.getNextAsInt(); // enforce the maxrows setting if (maxrows != 0 && tuplecount > maxrows) tuplecount = maxrows; res = new ResultSetResponse(id, tuplecount, columncount, rowcount, this, seqnr); // only add this resultset to // the hashmap if it can possibly // have an additional datablock if (rowcount < tuplecount) { if (rsresponses == null) rsresponses = new HashMap<>(); rsresponses.put(id, (ResultSetResponse) res); } } break; case StartOfHeaderParser.Q_UPDATE: res = new UpdateResponse( sohp.getNextAsInt(), // count sohp.getNextAsString() // key-id ); break; case StartOfHeaderParser.Q_SCHEMA: res = new SchemaResponse(); break; case StartOfHeaderParser.Q_TRANS: boolean ac = sohp.getNextAsString().equals("t"); if (autoCommit && ac) { addWarning("Server enabled auto commit " + "mode while local state " + "already was auto commit.", "01M11" ); } autoCommit = ac; res = new AutoCommitResponse(ac); break; case StartOfHeaderParser.Q_BLOCK: { // a new block of results for a // response... int id = sohp.getNextAsInt(); sohp.getNextAsInt(); // columncount int rowcount = sohp.getNextAsInt(); int offset = sohp.getNextAsInt(); ResultSetResponse t = rsresponses.get(id); if (t == null) { error = "M0M12!no ResultSetResponse with id " + id + " found"; break; } DataBlockResponse r = new DataBlockResponse(rowcount, t.getRSType() == ResultSet.TYPE_FORWARD_ONLY); t.addDataBlockResponse(offset, r); res = r; } break; } } catch (MCLParseException e) { error = "M0M10!error while parsing start of header:\n" + e.getMessage() + " found: '" + tmpLine.charAt(e.getErrorOffset()) + "'" + " in: \"" + tmpLine + "\"" + " at pos: " + e.getErrorOffset(); // flush all the rest in.waitForPrompt(); linetype = in.getLineType(); break; } // immediately handle errors after parsing // the header (res may be null) if (error != null) { in.waitForPrompt(); linetype = in.getLineType(); break; } // here we have a res object, which // we can start filling while (res.wantsMore()) { error = res.addLine( in.readLine(), in.getLineType() ); if (error != null) { // right, some protocol violation, // skip the rest of the result error = "M0M10!" + error; in.waitForPrompt(); linetype = in.getLineType(); break; } } if (error != null) break; // it is of no use to store // DataBlockReponses, you never want to // retrieve them directly anyway if (!(res instanceof DataBlockResponse)) responses.add(res); // read the next line (can be prompt, new // result, error, etc.) before we start the // loop over tmpLine = in.readLine(); linetype = in.getLineType(); break; case AbstractMCLReader.INFO: addWarning(tmpLine.substring(1), "01000"); // read the next line (can be prompt, new // result, error, etc.) before we start the // loop over tmpLine = in.readLine(); linetype = in.getLineType(); break; default: // Yeah... in Java this is correct! // we have something we don't // expect/understand, let's make it an error // message tmpLine = "!M0M10!protocol violation, unexpected line: " + tmpLine; // don't break; fall through... case AbstractMCLReader.ERROR: // read everything till the prompt (should be // error) we don't know if we ignore some // garbage here... but the log should reveal // that error = in.waitForPrompt(); linetype = in.getLineType(); if (error != null) { error = tmpLine.substring(1) + "\n" + error; } else { error = tmpLine.substring(1); } break; } } } // if we used the sendThread, make sure it has finished if (sendThreadInUse) { String tmp = sendThread.getErrors(); if (tmp != null) { if (error == null) { error = "08000!" + tmp; } else { error += "\n08000!" + tmp; } } } if (error != null) { SQLException ret = null; String[] errors = error.split("\n"); for (String error1 : errors) { if (ret == null) { ret = new SQLException(error1.substring(6), error1.substring(0, 5)); } else { ret.setNextException(new SQLException( error1.substring(6), error1.substring(0, 5))); } } throw ret; } } catch (SocketTimeoutException e) { close(); // JDBC 4.1 semantics, abort() throw new SQLException("connection timed out", "08M33"); } catch (IOException e) { closed = true; throw new SQLException(e.getMessage() + " (mserver still alive?)", "08000"); } } }