Mercurial > hg > monetdb-java
view src/main/java/nl/cwi/monetdb/responses/ResultSetResponse.java @ 64:bb0d66ad7dc6 embedded
More done
author | Pedro Ferreira <pedro.ferreira@monetdbsolutions.com> |
---|---|
date | Thu, 01 Dec 2016 16:52:27 +0100 (2016-12-01) |
parents | |
children | e605cdd6373f |
line wrap: on
line source
package nl.cwi.monetdb.responses; import nl.cwi.monetdb.jdbc.MonetConnection; import nl.cwi.monetdb.mcl.io.AbstractMCLReader; import nl.cwi.monetdb.mcl.parser.HeaderLineParser; import nl.cwi.monetdb.mcl.parser.MCLParseException; import java.sql.ResultSet; import java.sql.SQLException; /** * The ResultSetResponse represents a tabular result sent by the * server. This is typically an SQL table. The MAPI headers of the * Response look like: * <pre> * &1 1 28 2 10 * # name, value # name * # varchar, varchar # type * </pre> * there the first line consists out of<br /> * <tt>&"qt" "id" "tc" "cc" "rc"</tt>. */ public class ResultSetResponse implements IResponse { /** The number of columns in this result */ public final int columncount; /** The total number of rows this result set has */ public final int tuplecount; /** The numbers of rows to retrieve per DataBlockResponse */ private int cacheSize; /** The table ID of this result */ public final int id; /** The names of the columns in this result */ private String[] name; /** The types of the columns in this result */ private String[] type; /** The max string length for each column in this result */ private int[] columnLengths; /** The table for each column in this result */ private String[] tableNames; /** The query sequence number */ private final int seqnr; /** A List of result blocks (chunks of size fetchSize/cacheSize) */ private DataBlockResponse[] resultBlocks; /** A bitmap telling whether the headers are set or not */ private boolean[] isSet; /** Whether this Response is closed */ private boolean closed; /** The Connection that we should use when requesting a new block */ private ResponseList parent; /** Whether the fetchSize was explitly set by the user */ private boolean cacheSizeSetExplicitly = false; /** Whether we should send an Xclose command to the server * if we close this Response */ private boolean destroyOnClose; /** the offset to be used on Xexport queries */ private int blockOffset = 0; /** A parser for header lines */ HeaderLineParser hlp; private final static int NAMES = 0; private final static int TYPES = 1; private final static int TABLES = 2; private final static int LENS = 3; /** * Sole constructor, which requires a MonetConnection parent to * be given. * * @param id the ID of the result set * @param tuplecount the total number of tuples in the result set * @param columncount the number of columns in the result set * @param rowcount the number of rows in the current block * @param parent the parent that created this Response and will * supply new result blocks when necessary * @param seq the query sequence number */ ResultSetResponse(int id, int tuplecount, int columncount, int rowcount, ResponseList parent, int seq) throws SQLException { isSet = new boolean[7]; this.parent = parent; if (parent.cachesize == 0) { /* Below we have to calculate how many "chunks" we need * to allocate to store the entire result. However, if * the user didn't set a cache size, as in this case, we * need to stick to our defaults. */ cacheSize = ResponseList.DEF_FETCHSIZE; cacheSizeSetExplicitly = false; } else { cacheSize = parent.cachesize; cacheSizeSetExplicitly = true; } /* So far, so good. Now the problem with EXPLAIN, DOT, etc * queries is, that they don't support any block fetching, * so we need to always fetch everything at once. For that * reason, the cache size is here set to the rowcount if * it's larger, such that we do a full fetch at once. * (Because we always set a reply_size, we can only get a * larger rowcount from the server if it doesn't paginate, * because it's a pseudo SQL result.) */ if (rowcount > cacheSize) cacheSize = rowcount; seqnr = seq; closed = false; destroyOnClose = id > 0 && tuplecount > rowcount; this.id = id; this.tuplecount = tuplecount; this.columncount = columncount; this.resultBlocks = new DataBlockResponse[(tuplecount / cacheSize) + 1]; hlp = server.getHeaderLineParser(columncount); resultBlocks[0] = new DataBlockResponse(rowcount, parent.rstype == ResultSet.TYPE_FORWARD_ONLY); } /** * Parses the given string and changes the value of the matching * header appropriately, or passes it on to the underlying * DataResponse. * * @param tmpLine the string that contains the header * @return a non-null String if the header cannot be parsed or * is unknown */ // {{{ addLine @Override public String addLine(String tmpLine, int linetype) { if (isSet[LENS] && isSet[TYPES] && isSet[TABLES] && isSet[NAMES]) { return resultBlocks[0].addLine(tmpLine, linetype); } if (linetype != AbstractMCLReader.HEADER) return "header expected, got: " + tmpLine; // depending on the name of the header, we continue try { switch (hlp.parse(tmpLine)) { case HeaderLineParser.NAME: name = hlp.values.clone(); isSet[NAMES] = true; break; case HeaderLineParser.LENGTH: columnLengths = hlp.intValues.clone(); isSet[LENS] = true; break; case HeaderLineParser.TYPE: type = hlp.values.clone(); isSet[TYPES] = true; break; case HeaderLineParser.TABLE: tableNames = hlp.values.clone(); isSet[TABLES] = true; break; } } catch (MCLParseException e) { return e.getMessage(); } // all is well return null; } // }}} /** * Returns whether this ResultSetResponse needs more lines. * This method returns true if not all headers are set, or the * first DataBlockResponse reports to want more. */ @Override public boolean wantsMore() { return !(isSet[LENS] && isSet[TYPES] && isSet[TABLES] && isSet[NAMES]) || resultBlocks[0].wantsMore(); } /** * Returns an array of Strings containing the values between * ',\t' separators. * * @param chrLine a character array holding the input data * @param start where the relevant data starts * @param stop where the relevant data stops * @return an array of Strings */ final private String[] getValues(char[] chrLine, int start, int stop) { int elem = 0; String[] values = new String[columncount]; for (int i = start; i < stop; i++) { if (chrLine[i] == '\t' && chrLine[i - 1] == ',') { values[elem++] = new String(chrLine, start, i - 1 - start); start = i + 1; } } // at the left over part values[elem++] = new String(chrLine, start, stop - start); return values; } /** * Adds the given DataBlockResponse to this ResultSetResponse at * the given block position. * * @param offset the offset number of rows for this block * @param rr the DataBlockResponse to add */ void addDataBlockResponse(int offset, DataBlockResponse rr) { int block = (offset - blockOffset) / cacheSize; resultBlocks[block] = rr; } /** * Marks this Response as being completed. A complete Response * needs to be consistent with regard to its internal data. * * @throws SQLException if the data currently in this Response is not * sufficient to be consistant */ @Override public void complete() throws SQLException { String error = ""; if (!isSet[NAMES]) error += "name header missing\n"; if (!isSet[TYPES]) error += "type header missing\n"; if (!isSet[TABLES]) error += "table name header missing\n"; if (!isSet[LENS]) error += "column width header missing\n"; if (!error.equals("")) throw new SQLException(error, "M0M10"); } /** * Returns the names of the columns * * @return the names of the columns */ String[] getNames() { return name; } /** * Returns the types of the columns * * @return the types of the columns */ String[] getTypes() { return type; } /** * Returns the tables of the columns * * @return the tables of the columns */ String[] getTableNames() { return tableNames; } /** * Returns the lengths of the columns * * @return the lengths of the columns */ int[] getColumnLengths() { return columnLengths; } /** * Returns the cache size used within this Response * * @return the cache size */ int getCacheSize() { return cacheSize; } /** * Returns the current block offset * * @return the current block offset */ int getBlockOffset() { return blockOffset; } /** * Returns the ResultSet type, FORWARD_ONLY or not. * * @return the ResultSet type */ int getRSType() { return parent.rstype; } /** * Returns the concurrency of the ResultSet. * * @return the ResultSet concurrency */ int getRSConcur() { return parent.rsconcur; } /** * Returns a line from the cache. If the line is already present in the * cache, it is returned, if not apropriate actions are taken to make * sure the right block is being fetched and as soon as the requested * line is fetched it is returned. * * @param row the row in the result set to return * @return the exact row read as requested or null if the requested row * is out of the scope of the result set * @throws SQLException if an database error occurs */ String getLine(int row) throws SQLException { if (row >= tuplecount || row < 0) return null; int block = (row - blockOffset) / cacheSize; int blockLine = (row - blockOffset) % cacheSize; // do we have the right block loaded? (optimistic try) DataBlockResponse rawr; // load block if appropriate if ((rawr = resultBlocks[block]) == null) { /// TODO: ponder about a maximum number of blocks to keep /// in memory when dealing with random access to /// reduce memory blow-up // if we're running forward only, we can discard the oldmapi // block loaded if (parent.rstype == ResultSet.TYPE_FORWARD_ONLY) { for (int i = 0; i < block; i++) resultBlocks[i] = null; if (ResponseList.SeqCounter - 1 == seqnr && !cacheSizeSetExplicitly && tuplecount - row > cacheSize && cacheSize < ResponseList.DEF_FETCHSIZE * 10) { // there has no query been issued after this // one, so we can consider this an uninterrupted // continuation request. Let's once increase // the cacheSize as it was not explicitly set, // since the chances are high that we won't // bother anyone else by doing so, and just // gaining some performance. // store the previous position in the // blockOffset variable blockOffset += cacheSize; // increase the cache size (a lot) cacheSize *= 10; // by changing the cacheSize, we also // change the block measures. Luckily // we don't care about previous blocks // because we have a forward running // pointer only. However, we do have // to recalculate the block number, to // ensure the next call to find this // new block. block = (row - blockOffset) / cacheSize; blockLine = (row - blockOffset) % cacheSize; } } // ok, need to fetch cache block first parent.executeQuery(server.getCommandHeaderTemplates(), "export " + id + " " + ((block * cacheSize) + blockOffset) + " " + cacheSize); rawr = resultBlocks[block]; if (rawr == null) throw new AssertionError("block " + block + " should have been fetched by now :("); } return rawr.getRow(blockLine); } /** * Closes this Response by sending an Xclose to the server indicating * that the result can be closed at the server side as well. */ @Override public void close() { if (closed) return; // send command to server indicating we're done with this // result only if we had an ID in the header and this result // was larger than the reply size try { if (destroyOnClose) sendControlCommand("close " + id); } catch (SQLException e) { // probably a connection error... } // close the data block associated with us for (int i = 1; i < resultBlocks.length; i++) { DataBlockResponse r = resultBlocks[i]; if (r != null) r.close(); } closed = true; } /** * Returns whether this Response is closed * * @return whether this Response is closed */ boolean isClosed() { return closed; } }