Mercurial > hg > monetdb-java
view src/main/java/nl/cwi/monetdb/mcl/connection/SendThread.java @ 68:86967be24645 embedded
Ready to start testing the old mapi connection. After passing the tests. The embedded integration will be very straightforward.
author | Pedro Ferreira <pedro.ferreira@monetdbsolutions.com> |
---|---|
date | Wed, 07 Dec 2016 15:59:27 +0100 (2016-12-07) |
parents | 7307caacc2d5 |
children | 4e2a2a81cc6a |
line wrap: on
line source
package nl.cwi.monetdb.mcl.connection; import nl.cwi.monetdb.mcl.protocol.AbstractProtocol; import java.io.IOException; import java.sql.SQLException; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * A thread to send a query to the server. When sending large * amounts of data to a server, the output buffer of the underlying * communication socket may overflow. In such case the sending * process blocks. In order to prevent deadlock, it might be * desirable that the driver as a whole does not block. This thread * facilitates the prevention of such 'full block', because this * separate thread only will block.<br /> * This thread is designed for reuse, as thread creation costs are * high. */ public class SendThread extends Thread { private enum SendThreadStatus { /** The state WAIT represents this thread to be waiting for something to do */ WAIT, /** The state QUERY represents this thread to be executing a query */ QUERY, /** The state SHUTDOWN is the final state that ends this thread */ SHUTDOWN } private byte[][] templ; private byte[] query; private AbstractProtocol protocol; private String error; private SendThreadStatus state = SendThreadStatus.WAIT; private final Lock sendLock = new ReentrantLock(); private final Condition queryAvailable = sendLock.newCondition(); private final Condition waiting = sendLock.newCondition(); /** * Constructor which immediately starts this thread and sets it into daemon mode. * * @param out the socket to write to */ public SendThread(AbstractProtocol out) { super("SendThread"); this.setDaemon(true); this.protocol = out; this.start(); } @Override public void run() { sendLock.lock(); try { while (true) { while (state == SendThreadStatus.WAIT) { try { queryAvailable.await(); } catch (InterruptedException e) { // woken up, eh? } } if (state == SendThreadStatus.SHUTDOWN) break; // state is QUERY here try { protocol.writeNextCommand((templ[0] == null ? MonetDBLanguage.EmptyString : templ[0]), query, (templ[1] == null ? MonetDBLanguage.EmptyString : templ[1])); } catch (IOException e) { error = e.getMessage(); } // update our state, and notify, maybe someone is waiting // for us in throwErrors state = SendThreadStatus.WAIT; waiting.signal(); } } finally { sendLock.unlock(); } } /** * Starts sending the given query over the given socket. Beware that the thread should be finished (can be assured * by calling throwErrors()) before this method is called! * * @param templ the query template * @param query the query itself * @throws SQLException if this SendThread is already in use */ public void runQuery(byte[][] templ, String query) throws SQLException { sendLock.lock(); try { if (state != SendThreadStatus.WAIT) { throw new SQLException("SendThread already in use or shutting down!", "M0M03"); } this.templ = templ; this.query = query.getBytes(); // let the thread know there is some work to do state = SendThreadStatus.QUERY; queryAvailable.signal(); } finally { sendLock.unlock(); } } /** * Returns errors encountered during the sending process. * * @return the errors or null if none */ public String getErrors() { sendLock.lock(); try { // make sure the thread is in WAIT state, not QUERY while (state == SendThreadStatus.QUERY) { try { waiting.await(); } catch (InterruptedException e) { // just try again } } if (state == SendThreadStatus.SHUTDOWN) error = "SendThread is shutting down"; } finally { sendLock.unlock(); } return error; } /** * Requests this SendThread to stop. */ public void shutdown() { sendLock.lock(); state = SendThreadStatus.SHUTDOWN; sendLock.unlock(); this.interrupt(); // break any wait conditions } }