comparison src/main/java/nl/cwi/monetdb/jdbc/MonetConnection.java @ 285:637899bda602

Removed SendThread class and its usage from MonetConnection. It appeared the usage of a SendThread object (per connection) resulted sometimes to a non-responding/hanging JDBC connection, see bugs 6571 and 6693. Tests with large query scripts and large results returned show that the SendThread class is not needed in the JDBC driver to function properly. Also submitter of bug 6693 verified that the removal of SendThread resolved the hanging JDBC-driver problem without any side effects.
author Martin van Dinther <martin.van.dinther@monetdbsolutions.com>
date Thu, 25 Jul 2019 16:08:27 +0200 (2019-07-25)
parents e5b99c929a2d
children d430f8adbf1b
comparison
equal deleted inserted replaced
284:e5b99c929a2d 285:637899bda602
400 // better luck next time! 400 // better luck next time!
401 } 401 }
402 } 402 }
403 // close the socket 403 // close the socket
404 server.close(); 404 server.close();
405 // close active SendThread if any
406 if (sendThread != null) {
407 sendThread.shutdown();
408 sendThread = null;
409 }
410 // report ourselves as closed 405 // report ourselves as closed
411 closed = true; 406 closed = true;
412 } 407 }
413 } 408 }
414 409
1859 1854
1860 /** the default number of rows that are (attempted to) read at once */ 1855 /** the default number of rows that are (attempted to) read at once */
1861 private static final int DEF_FETCHSIZE = 250; 1856 private static final int DEF_FETCHSIZE = 250;
1862 /** The sequence counter */ 1857 /** The sequence counter */
1863 private static int seqCounter = 0; 1858 private static int seqCounter = 0;
1864
1865 /** An optional thread that is used for sending large queries */
1866 private SendThread sendThread = null;
1867 1859
1868 /** 1860 /**
1869 * A Response is a message sent by the server to indicate some 1861 * A Response is a message sent by the server to indicate some
1870 * action has taken place, and possible results of that action. 1862 * action has taken place, and possible results of that action.
1871 */ 1863 */
2691 */ 2683 */
2692 @SuppressWarnings("fallthrough") 2684 @SuppressWarnings("fallthrough")
2693 void executeQuery(String[] templ, String query) 2685 void executeQuery(String[] templ, String query)
2694 throws SQLException 2686 throws SQLException
2695 { 2687 {
2696 boolean sendThreadInUse = false;
2697 String error = null; 2688 String error = null;
2698 2689
2699 try { 2690 try {
2700 synchronized (server) { 2691 synchronized (server) {
2701 // make sure we're ready to send query; read data till we 2692 // make sure we're ready to send query; read data till we
2721 // store the reply size after a successful change 2712 // store the reply size after a successful change
2722 curReplySize = size; 2713 curReplySize = size;
2723 } 2714 }
2724 // }}} set reply size 2715 // }}} set reply size
2725 2716
2726 // If the query is larger than the TCP buffer size, use a 2717 // send query to the server
2727 // special send thread to avoid deadlock with the server due 2718 out.writeLine( (templ[0] == null ? "" : templ[0]) + query + (templ[1] == null ? "" : templ[1]) );
2728 // to blocking behaviour when the buffer is full. Because
2729 // the server will be writing back results to us, it will
2730 // eventually block as well when its TCP buffer gets full,
2731 // as we are blocking and not consuming from it. The result
2732 // is a state where both client and server want to write,
2733 // but block.
2734 if (query.length() > MapiSocket.BLOCK) {
2735 // get a reference to the send thread
2736 if (sendThread == null)
2737 sendThread = new SendThread(out);
2738 // tell it to do some work!
2739 sendThread.runQuery(templ, query);
2740 sendThreadInUse = true;
2741 } else {
2742 // this is a simple call, which is a lot cheaper and will
2743 // always succeed for small queries.
2744 out.writeLine(
2745 (templ[0] == null ? "" : templ[0]) +
2746 query +
2747 (templ[1] == null ? "" : templ[1]));
2748 }
2749 2719
2750 // go for new results 2720 // go for new results
2751 String tmpLine = in.readLine(); 2721 String tmpLine = in.readLine();
2752 int linetype = in.getLineType(); 2722 int linetype = in.getLineType();
2753 Response res = null; 2723 Response res = null;
2879 break; 2849 break;
2880 } // end of switch (linetype) 2850 } // end of switch (linetype)
2881 } // end of while (linetype != BufferedMCLReader.PROMPT) 2851 } // end of while (linetype != BufferedMCLReader.PROMPT)
2882 } // end of synchronized (server) 2852 } // end of synchronized (server)
2883 2853
2884 // if we used the sendThread, make sure it has finished
2885 if (sendThreadInUse) {
2886 String tmp = sendThread.getErrors();
2887 if (tmp != null) {
2888 if (error == null) {
2889 error = "08000!" + tmp;
2890 } else {
2891 error += "\n08000!" + tmp;
2892 }
2893 }
2894 }
2895 if (error != null) { 2854 if (error != null) {
2896 SQLException ret = null; 2855 SQLException ret = null;
2897 String[] errors = error.split("\n"); 2856 String[] errors = error.split("\n");
2898 for (int i = 0; i < errors.length; i++) { 2857 for (int i = 0; i < errors.length; i++) {
2899 SQLException newErr; 2858 SQLException newErr;
2918 throw new SQLNonTransientConnectionException(e.getMessage() + " (mserver5 still alive?)", "08006"); 2877 throw new SQLNonTransientConnectionException(e.getMessage() + " (mserver5 still alive?)", "08006");
2919 } 2878 }
2920 } 2879 }
2921 } 2880 }
2922 // }}} 2881 // }}}
2923
2924 /**
2925 * A thread to send a query to the server. When sending large
2926 * amounts of data to a server, the output buffer of the underlying
2927 * communication socket may overflow. In such case the sending
2928 * process blocks. In order to prevent deadlock, it might be
2929 * desirable that the driver as a whole does not block. This thread
2930 * facilitates the prevention of such 'full block', because this
2931 * separate thread only will block.<br />
2932 * This thread is designed for reuse, as thread creation costs are
2933 * high.
2934 */
2935 // {{{ SendThread class implementation
2936 static class SendThread extends Thread {
2937 /** The state WAIT represents this thread to be waiting for
2938 * something to do */
2939 private static final int WAIT = 0;
2940 /** The state QUERY represents this thread to be executing a query */
2941 private static final int QUERY = 1;
2942 /** The state SHUTDOWN is the final state that ends this thread */
2943 private static final int SHUTDOWN = -1;
2944
2945 private String[] templ;
2946 private String query;
2947 private BufferedMCLWriter out;
2948 private String error;
2949 private int state = WAIT;
2950
2951 final Lock sendLock = new ReentrantLock();
2952 final Condition queryAvailable = sendLock.newCondition();
2953 final Condition waiting = sendLock.newCondition();
2954
2955 /**
2956 * Constructor which immediately starts this thread and sets it
2957 * into daemon mode.
2958 *
2959 * @param monet the socket to write to
2960 */
2961 public SendThread(BufferedMCLWriter out) {
2962 super("SendThread");
2963 setDaemon(true);
2964 this.out = out;
2965 start();
2966 }
2967
2968 @Override
2969 public void run() {
2970 sendLock.lock();
2971 try {
2972 while (true) {
2973 while (state == WAIT) {
2974 try {
2975 queryAvailable.await();
2976 } catch (InterruptedException e) {
2977 // woken up, eh?
2978 }
2979 }
2980 if (state == SHUTDOWN)
2981 break;
2982
2983 // state is QUERY here
2984 try {
2985 out.writeLine(
2986 (templ[0] == null ? "" : templ[0]) +
2987 query +
2988 (templ[1] == null ? "" : templ[1]));
2989 } catch (IOException e) {
2990 error = e.getMessage();
2991 }
2992
2993 // update our state, and notify, maybe someone is waiting
2994 // for us in throwErrors
2995 state = WAIT;
2996 waiting.signal();
2997 }
2998 } finally {
2999 sendLock.unlock();
3000 }
3001 }
3002
3003 /**
3004 * Starts sending the given query over the given socket. Beware
3005 * that the thread should be finished (can be assured by calling
3006 * throwErrors()) before this method is called!
3007 *
3008 * @param templ the query template
3009 * @param query the query itself
3010 * @throws SQLException if this SendThread is already in use
3011 */
3012 public void runQuery(String[] templ, String query) throws SQLException {
3013 sendLock.lock();
3014 try {
3015 if (state != WAIT)
3016 throw new SQLException("SendThread already in use or shutting down!", "M0M03");
3017
3018 this.templ = templ;
3019 this.query = query;
3020
3021 // let the thread know there is some work to do
3022 state = QUERY;
3023 queryAvailable.signal();
3024 } finally {
3025 sendLock.unlock();
3026 }
3027 }
3028
3029 /**
3030 * Returns errors encountered during the sending process.
3031 *
3032 * @return the errors or null if none
3033 */
3034 public String getErrors() {
3035 sendLock.lock();
3036 try {
3037 // make sure the thread is in WAIT state, not QUERY
3038 while (state == QUERY) {
3039 try {
3040 waiting.await();
3041 } catch (InterruptedException e) {
3042 // just try again
3043 }
3044 }
3045 if (state == SHUTDOWN)
3046 error = "SendThread is shutting down";
3047 } finally {
3048 sendLock.unlock();
3049 }
3050 return error;
3051 }
3052
3053 /**
3054 * Requests this SendThread to stop.
3055 */
3056 public void shutdown() {
3057 sendLock.lock();
3058 state = SHUTDOWN;
3059 sendLock.unlock();
3060 this.interrupt(); // break any wait conditions
3061 }
3062 }
3063 // }}}
3064 } 2882 }