Mercurial > hg > monetdb-java
comparison src/main/java/nl/cwi/monetdb/jdbc/MonetConnection.java @ 285:637899bda602
Removed SendThread class and its usage from MonetConnection.
It appeared the usage of a SendThread object (per connection) resulted sometimes to a non-responding/hanging JDBC connection, see bugs 6571 and 6693.
Tests with large query scripts and large results returned show that the SendThread class is not needed in the JDBC driver to function properly.
Also submitter of bug 6693 verified that the removal of SendThread resolved the hanging JDBC-driver problem without any side effects.
author | Martin van Dinther <martin.van.dinther@monetdbsolutions.com> |
---|---|
date | Thu, 25 Jul 2019 16:08:27 +0200 (2019-07-25) |
parents | e5b99c929a2d |
children | d430f8adbf1b |
comparison
equal
deleted
inserted
replaced
284:e5b99c929a2d | 285:637899bda602 |
---|---|
400 // better luck next time! | 400 // better luck next time! |
401 } | 401 } |
402 } | 402 } |
403 // close the socket | 403 // close the socket |
404 server.close(); | 404 server.close(); |
405 // close active SendThread if any | |
406 if (sendThread != null) { | |
407 sendThread.shutdown(); | |
408 sendThread = null; | |
409 } | |
410 // report ourselves as closed | 405 // report ourselves as closed |
411 closed = true; | 406 closed = true; |
412 } | 407 } |
413 } | 408 } |
414 | 409 |
1859 | 1854 |
1860 /** the default number of rows that are (attempted to) read at once */ | 1855 /** the default number of rows that are (attempted to) read at once */ |
1861 private static final int DEF_FETCHSIZE = 250; | 1856 private static final int DEF_FETCHSIZE = 250; |
1862 /** The sequence counter */ | 1857 /** The sequence counter */ |
1863 private static int seqCounter = 0; | 1858 private static int seqCounter = 0; |
1864 | |
1865 /** An optional thread that is used for sending large queries */ | |
1866 private SendThread sendThread = null; | |
1867 | 1859 |
1868 /** | 1860 /** |
1869 * A Response is a message sent by the server to indicate some | 1861 * A Response is a message sent by the server to indicate some |
1870 * action has taken place, and possible results of that action. | 1862 * action has taken place, and possible results of that action. |
1871 */ | 1863 */ |
2691 */ | 2683 */ |
2692 @SuppressWarnings("fallthrough") | 2684 @SuppressWarnings("fallthrough") |
2693 void executeQuery(String[] templ, String query) | 2685 void executeQuery(String[] templ, String query) |
2694 throws SQLException | 2686 throws SQLException |
2695 { | 2687 { |
2696 boolean sendThreadInUse = false; | |
2697 String error = null; | 2688 String error = null; |
2698 | 2689 |
2699 try { | 2690 try { |
2700 synchronized (server) { | 2691 synchronized (server) { |
2701 // make sure we're ready to send query; read data till we | 2692 // make sure we're ready to send query; read data till we |
2721 // store the reply size after a successful change | 2712 // store the reply size after a successful change |
2722 curReplySize = size; | 2713 curReplySize = size; |
2723 } | 2714 } |
2724 // }}} set reply size | 2715 // }}} set reply size |
2725 | 2716 |
2726 // If the query is larger than the TCP buffer size, use a | 2717 // send query to the server |
2727 // special send thread to avoid deadlock with the server due | 2718 out.writeLine( (templ[0] == null ? "" : templ[0]) + query + (templ[1] == null ? "" : templ[1]) ); |
2728 // to blocking behaviour when the buffer is full. Because | |
2729 // the server will be writing back results to us, it will | |
2730 // eventually block as well when its TCP buffer gets full, | |
2731 // as we are blocking and not consuming from it. The result | |
2732 // is a state where both client and server want to write, | |
2733 // but block. | |
2734 if (query.length() > MapiSocket.BLOCK) { | |
2735 // get a reference to the send thread | |
2736 if (sendThread == null) | |
2737 sendThread = new SendThread(out); | |
2738 // tell it to do some work! | |
2739 sendThread.runQuery(templ, query); | |
2740 sendThreadInUse = true; | |
2741 } else { | |
2742 // this is a simple call, which is a lot cheaper and will | |
2743 // always succeed for small queries. | |
2744 out.writeLine( | |
2745 (templ[0] == null ? "" : templ[0]) + | |
2746 query + | |
2747 (templ[1] == null ? "" : templ[1])); | |
2748 } | |
2749 | 2719 |
2750 // go for new results | 2720 // go for new results |
2751 String tmpLine = in.readLine(); | 2721 String tmpLine = in.readLine(); |
2752 int linetype = in.getLineType(); | 2722 int linetype = in.getLineType(); |
2753 Response res = null; | 2723 Response res = null; |
2879 break; | 2849 break; |
2880 } // end of switch (linetype) | 2850 } // end of switch (linetype) |
2881 } // end of while (linetype != BufferedMCLReader.PROMPT) | 2851 } // end of while (linetype != BufferedMCLReader.PROMPT) |
2882 } // end of synchronized (server) | 2852 } // end of synchronized (server) |
2883 | 2853 |
2884 // if we used the sendThread, make sure it has finished | |
2885 if (sendThreadInUse) { | |
2886 String tmp = sendThread.getErrors(); | |
2887 if (tmp != null) { | |
2888 if (error == null) { | |
2889 error = "08000!" + tmp; | |
2890 } else { | |
2891 error += "\n08000!" + tmp; | |
2892 } | |
2893 } | |
2894 } | |
2895 if (error != null) { | 2854 if (error != null) { |
2896 SQLException ret = null; | 2855 SQLException ret = null; |
2897 String[] errors = error.split("\n"); | 2856 String[] errors = error.split("\n"); |
2898 for (int i = 0; i < errors.length; i++) { | 2857 for (int i = 0; i < errors.length; i++) { |
2899 SQLException newErr; | 2858 SQLException newErr; |
2918 throw new SQLNonTransientConnectionException(e.getMessage() + " (mserver5 still alive?)", "08006"); | 2877 throw new SQLNonTransientConnectionException(e.getMessage() + " (mserver5 still alive?)", "08006"); |
2919 } | 2878 } |
2920 } | 2879 } |
2921 } | 2880 } |
2922 // }}} | 2881 // }}} |
2923 | |
2924 /** | |
2925 * A thread to send a query to the server. When sending large | |
2926 * amounts of data to a server, the output buffer of the underlying | |
2927 * communication socket may overflow. In such case the sending | |
2928 * process blocks. In order to prevent deadlock, it might be | |
2929 * desirable that the driver as a whole does not block. This thread | |
2930 * facilitates the prevention of such 'full block', because this | |
2931 * separate thread only will block.<br /> | |
2932 * This thread is designed for reuse, as thread creation costs are | |
2933 * high. | |
2934 */ | |
2935 // {{{ SendThread class implementation | |
2936 static class SendThread extends Thread { | |
2937 /** The state WAIT represents this thread to be waiting for | |
2938 * something to do */ | |
2939 private static final int WAIT = 0; | |
2940 /** The state QUERY represents this thread to be executing a query */ | |
2941 private static final int QUERY = 1; | |
2942 /** The state SHUTDOWN is the final state that ends this thread */ | |
2943 private static final int SHUTDOWN = -1; | |
2944 | |
2945 private String[] templ; | |
2946 private String query; | |
2947 private BufferedMCLWriter out; | |
2948 private String error; | |
2949 private int state = WAIT; | |
2950 | |
2951 final Lock sendLock = new ReentrantLock(); | |
2952 final Condition queryAvailable = sendLock.newCondition(); | |
2953 final Condition waiting = sendLock.newCondition(); | |
2954 | |
2955 /** | |
2956 * Constructor which immediately starts this thread and sets it | |
2957 * into daemon mode. | |
2958 * | |
2959 * @param monet the socket to write to | |
2960 */ | |
2961 public SendThread(BufferedMCLWriter out) { | |
2962 super("SendThread"); | |
2963 setDaemon(true); | |
2964 this.out = out; | |
2965 start(); | |
2966 } | |
2967 | |
2968 @Override | |
2969 public void run() { | |
2970 sendLock.lock(); | |
2971 try { | |
2972 while (true) { | |
2973 while (state == WAIT) { | |
2974 try { | |
2975 queryAvailable.await(); | |
2976 } catch (InterruptedException e) { | |
2977 // woken up, eh? | |
2978 } | |
2979 } | |
2980 if (state == SHUTDOWN) | |
2981 break; | |
2982 | |
2983 // state is QUERY here | |
2984 try { | |
2985 out.writeLine( | |
2986 (templ[0] == null ? "" : templ[0]) + | |
2987 query + | |
2988 (templ[1] == null ? "" : templ[1])); | |
2989 } catch (IOException e) { | |
2990 error = e.getMessage(); | |
2991 } | |
2992 | |
2993 // update our state, and notify, maybe someone is waiting | |
2994 // for us in throwErrors | |
2995 state = WAIT; | |
2996 waiting.signal(); | |
2997 } | |
2998 } finally { | |
2999 sendLock.unlock(); | |
3000 } | |
3001 } | |
3002 | |
3003 /** | |
3004 * Starts sending the given query over the given socket. Beware | |
3005 * that the thread should be finished (can be assured by calling | |
3006 * throwErrors()) before this method is called! | |
3007 * | |
3008 * @param templ the query template | |
3009 * @param query the query itself | |
3010 * @throws SQLException if this SendThread is already in use | |
3011 */ | |
3012 public void runQuery(String[] templ, String query) throws SQLException { | |
3013 sendLock.lock(); | |
3014 try { | |
3015 if (state != WAIT) | |
3016 throw new SQLException("SendThread already in use or shutting down!", "M0M03"); | |
3017 | |
3018 this.templ = templ; | |
3019 this.query = query; | |
3020 | |
3021 // let the thread know there is some work to do | |
3022 state = QUERY; | |
3023 queryAvailable.signal(); | |
3024 } finally { | |
3025 sendLock.unlock(); | |
3026 } | |
3027 } | |
3028 | |
3029 /** | |
3030 * Returns errors encountered during the sending process. | |
3031 * | |
3032 * @return the errors or null if none | |
3033 */ | |
3034 public String getErrors() { | |
3035 sendLock.lock(); | |
3036 try { | |
3037 // make sure the thread is in WAIT state, not QUERY | |
3038 while (state == QUERY) { | |
3039 try { | |
3040 waiting.await(); | |
3041 } catch (InterruptedException e) { | |
3042 // just try again | |
3043 } | |
3044 } | |
3045 if (state == SHUTDOWN) | |
3046 error = "SendThread is shutting down"; | |
3047 } finally { | |
3048 sendLock.unlock(); | |
3049 } | |
3050 return error; | |
3051 } | |
3052 | |
3053 /** | |
3054 * Requests this SendThread to stop. | |
3055 */ | |
3056 public void shutdown() { | |
3057 sendLock.lock(); | |
3058 state = SHUTDOWN; | |
3059 sendLock.unlock(); | |
3060 this.interrupt(); // break any wait conditions | |
3061 } | |
3062 } | |
3063 // }}} | |
3064 } | 2882 } |