Mercurial > hg > monetdb-java
view src/main/java/nl/cwi/monetdb/mcl/connection/SenderThread.java @ 87:2b5e32efb1a4 embedded
Made all the mappings for the MAPI connection, now it needs to be added on the Embedded connection. Changed the compilation target to 1.8 because of the timezones. Implemented some JDBC methods as well.
author | Pedro Ferreira <pedro.ferreira@monetdbsolutions.com> |
---|---|
date | Tue, 03 Jan 2017 18:50:07 +0000 (2017-01-03) |
parents | a3c686217ca1 |
children | 6f74e01c57da |
line wrap: on
line source
/* * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. * * Copyright 1997 - July 2008 CWI, August 2008 - 2016 MonetDB B.V. */ package nl.cwi.monetdb.mcl.connection; import nl.cwi.monetdb.mcl.protocol.AbstractProtocol; import java.io.IOException; import java.sql.SQLException; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * A thread to send a query to the server. When sending large * amounts of data to a server, the output buffer of the underlying * communication socket may overflow. In such case the sending * process blocks. In order to prevent deadlock, it might be * desirable that the driver as a whole does not block. This thread * facilitates the prevention of such 'full block', because this * separate thread only will block.<br /> * This thread is designed for reuse, as thread creation costs are * high. */ public class SenderThread extends Thread { private enum SendThreadStatus { /** The state WAIT represents this thread to be waiting for something to do */ WAIT, /** The state QUERY represents this thread to be executing a query */ QUERY, /** The state SHUTDOWN is the final state that ends this thread */ SHUTDOWN } private String[] templ; private String query; private AbstractProtocol protocol; private String error; private SendThreadStatus state = SendThreadStatus.WAIT; private final Lock sendLock = new ReentrantLock(); private final Condition queryAvailable = sendLock.newCondition(); private final Condition waiting = sendLock.newCondition(); /** * Constructor which immediately starts this thread and sets it into daemon mode. * * @param out the socket to write to */ public SenderThread(AbstractProtocol out) { super("SendThread"); this.setDaemon(true); this.protocol = out; this.start(); } @Override public void run() { this.sendLock.lock(); try { while (true) { while (this.state == SendThreadStatus.WAIT) { try { this.queryAvailable.await(); } catch (InterruptedException e) { // woken up, eh? } } if (this.state == SendThreadStatus.SHUTDOWN) break; // state is QUERY here try { this.protocol.writeNextQuery((templ[0] == null ? "" : templ[0]), query, (templ[1] == null ? "" : templ[1])); } catch (IOException e) { this.error = e.getMessage(); } // update our state, and notify, maybe someone is waiting // for us in throwErrors this.state = SendThreadStatus.WAIT; this.waiting.signal(); } } finally { this.sendLock.unlock(); } } /** * Starts sending the given query over the given socket. Beware that the thread should be finished (can be assured * by calling throwErrors()) before this method is called! * * @param templ the query template * @param query the query itself * @throws SQLException if this SendThread is already in use */ public void runQuery(String[] templ, String query) throws SQLException { this.sendLock.lock(); try { if (this.state != SendThreadStatus.WAIT) { throw new SQLException("Sender Thread already in use or shutting down!", "M0M03"); } this.templ = templ; this.query = query; // let the thread know there is some work to do this.state = SendThreadStatus.QUERY; this.queryAvailable.signal(); } finally { this.sendLock.unlock(); } } /** * Returns errors encountered during the sending process. * * @return the errors or null if none */ public String getErrors() { this.sendLock.lock(); try { // make sure the thread is in WAIT state, not QUERY while (this.state == SendThreadStatus.QUERY) { try { this.waiting.await(); } catch (InterruptedException e) { // just try again } } if (this.state == SendThreadStatus.SHUTDOWN) this.error = "SendThread is shutting down"; } finally { this.sendLock.unlock(); } return error; } /** * Requests this SendThread to stop. */ public void shutdown() { sendLock.lock(); state = SendThreadStatus.SHUTDOWN; sendLock.unlock(); this.interrupt(); // break any wait conditions } }