Mercurial > hg > monetdb-java
view src/main/java/nl/cwi/monetdb/mcl/connection/SenderThread.java @ 81:a3c686217ca1 embedded
Made many fixes for the embedded connection
author | Pedro Ferreira <pedro.ferreira@monetdbsolutions.com> |
---|---|
date | Fri, 16 Dec 2016 18:41:09 +0100 (2016-12-16) |
parents | 17365ed26611 |
children | 2b5e32efb1a4 |
line wrap: on
line source
package nl.cwi.monetdb.mcl.connection; import nl.cwi.monetdb.mcl.protocol.AbstractProtocol; import java.io.IOException; import java.sql.SQLException; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * A thread to send a query to the server. When sending large * amounts of data to a server, the output buffer of the underlying * communication socket may overflow. In such case the sending * process blocks. In order to prevent deadlock, it might be * desirable that the driver as a whole does not block. This thread * facilitates the prevention of such 'full block', because this * separate thread only will block.<br /> * This thread is designed for reuse, as thread creation costs are * high. */ public class SenderThread extends Thread { private enum SendThreadStatus { /** The state WAIT represents this thread to be waiting for something to do */ WAIT, /** The state QUERY represents this thread to be executing a query */ QUERY, /** The state SHUTDOWN is the final state that ends this thread */ SHUTDOWN } private String[] templ; private String query; private AbstractProtocol protocol; private String error; private SendThreadStatus state = SendThreadStatus.WAIT; private final Lock sendLock = new ReentrantLock(); private final Condition queryAvailable = sendLock.newCondition(); private final Condition waiting = sendLock.newCondition(); /** * Constructor which immediately starts this thread and sets it into daemon mode. * * @param out the socket to write to */ public SenderThread(AbstractProtocol out) { super("SendThread"); this.setDaemon(true); this.protocol = out; this.start(); } @Override public void run() { this.sendLock.lock(); try { while (true) { while (this.state == SendThreadStatus.WAIT) { try { this.queryAvailable.await(); } catch (InterruptedException e) { // woken up, eh? } } if (this.state == SendThreadStatus.SHUTDOWN) break; // state is QUERY here try { this.protocol.writeNextQuery((templ[0] == null ? "" : templ[0]), query, (templ[1] == null ? "" : templ[1])); } catch (IOException e) { this.error = e.getMessage(); } // update our state, and notify, maybe someone is waiting // for us in throwErrors this.state = SendThreadStatus.WAIT; this.waiting.signal(); } } finally { this.sendLock.unlock(); } } /** * Starts sending the given query over the given socket. Beware that the thread should be finished (can be assured * by calling throwErrors()) before this method is called! * * @param templ the query template * @param query the query itself * @throws SQLException if this SendThread is already in use */ public void runQuery(String[] templ, String query) throws SQLException { this.sendLock.lock(); try { if (this.state != SendThreadStatus.WAIT) { throw new SQLException("Sender Thread already in use or shutting down!", "M0M03"); } this.templ = templ; this.query = query; // let the thread know there is some work to do this.state = SendThreadStatus.QUERY; this.queryAvailable.signal(); } finally { this.sendLock.unlock(); } } /** * Returns errors encountered during the sending process. * * @return the errors or null if none */ public String getErrors() { this.sendLock.lock(); try { // make sure the thread is in WAIT state, not QUERY while (this.state == SendThreadStatus.QUERY) { try { this.waiting.await(); } catch (InterruptedException e) { // just try again } } if (this.state == SendThreadStatus.SHUTDOWN) this.error = "SendThread is shutting down"; } finally { this.sendLock.unlock(); } return error; } /** * Requests this SendThread to stop. */ public void shutdown() { sendLock.lock(); state = SendThreadStatus.SHUTDOWN; sendLock.unlock(); this.interrupt(); // break any wait conditions } }