comparison src/main/java/org/monetdb/mcl/net/MapiSocket.java @ 391:f523727db392

Moved Java classes from packages starting with nl.cwi.monetdb.* to package org.monetdb.* This naming complies to the Java Package Naming convention as MonetDB's main website is www.monetdb.org.
author Martin van Dinther <martin.van.dinther@monetdbsolutions.com>
date Thu, 12 Nov 2020 22:02:01 +0100 (2020-11-12)
parents src/main/java/nl/cwi/monetdb/mcl/net/MapiSocket.java@af6db116238d
children bf9f6b6ecf40
comparison
equal deleted inserted replaced
390:6199e0be3c6e 391:f523727db392
1 /*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2020 MonetDB B.V.
7 */
8
9 package org.monetdb.mcl.net;
10
11 import java.io.BufferedInputStream;
12 import java.io.BufferedOutputStream;
13 import java.io.FileWriter;
14 import java.io.FilterInputStream;
15 import java.io.FilterOutputStream;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.OutputStream;
19 import java.io.UnsupportedEncodingException;
20 import java.io.Writer;
21 import java.net.Socket;
22 import java.net.SocketException;
23 import java.net.UnknownHostException;
24 import java.net.URI;
25 import java.security.MessageDigest;
26 import java.security.NoSuchAlgorithmException;
27 import java.util.ArrayList;
28 import java.util.HashSet;
29 import java.util.List;
30
31 import org.monetdb.mcl.MCLException;
32 import org.monetdb.mcl.io.BufferedMCLReader;
33 import org.monetdb.mcl.io.BufferedMCLWriter;
34 import org.monetdb.mcl.parser.MCLParseException;
35
36 /**
37 * A Socket for communicating with the MonetDB database in MAPI block
38 * mode.
39 *
40 * The MapiSocket implements the protocol specifics of the MAPI block
41 * mode protocol, and interfaces it as a socket that delivers a
42 * BufferedReader and a BufferedWriter. Because logging in is an
43 * integral part of the MAPI protocol, the MapiSocket performs the login
44 * procedure. Like the Socket class, various options can be set before
45 * calling the connect() method to influence the login process. Only
46 * after a successful call to connect() the BufferedReader and
47 * BufferedWriter can be retrieved.
48 *
49 * For each line read, it is determined what type of line it is
50 * according to the MonetDB MAPI protocol. This results in a line to be
51 * PROMPT, HEADER, RESULT, ERROR or UNKNOWN. Use the getLineType()
52 * method on the BufferedMCLReader to retrieve the type of the last
53 * line read.
54 *
55 * For debugging purposes a socket level debugging is implemented where
56 * each and every interaction to and from the MonetDB server is logged
57 * to a file on disk.
58 * Incoming messages are prefixed by "RX" (received by the driver),
59 * outgoing messages by "TX" (transmitted by the driver). Special
60 * decoded non-human readable messages are prefixed with "RD" and "TD"
61 * instead. Following this two char prefix, a timestamp follows as the
62 * number of milliseconds since the UNIX epoch. The rest of the line is
63 * a String representation of the data sent or received.
64 *
65 * The general use of this Socket must be seen only in the full context
66 * of a MAPI connection to a server. It has the same ingredients as a
67 * normal Socket, allowing for seamless plugging.
68 * <pre>
69 * Socket \ / InputStream ----&gt; (BufferedMCL)Reader
70 * &gt; o &lt;
71 * MapiSocket / \ OutputStream ----&gt; (BufferedMCL)Writer
72 * </pre>
73 * The MapiSocket allows to retrieve Streams for communicating. They
74 * are interfaced, so they can be chained in any way. While the Socket
75 * transparently deals with how data is sent over the wire, the actual
76 * data read needs to be interpreted, for which a Reader/Writer
77 * interface is most sufficient. In particular the BufferedMCL*
78 * implementations of those interfaces supply some extra functionality
79 * geared towards the format of the data.
80 *
81 * @author Fabian Groffen
82 * @version 4.1
83 * @see org.monetdb.mcl.io.BufferedMCLReader
84 * @see org.monetdb.mcl.io.BufferedMCLWriter
85 */
86 public class MapiSocket { /* cannot (yet) be final as nl.cwi.monetdb.mcl.net.MapiSocket extends this class */
87 /** The TCP Socket to mserver */
88 private Socket con;
89 /** The TCP Socket timeout in milliseconds. Default is 0 meaning the timeout is disabled (i.e., timeout of infinity) */
90 private int soTimeout = 0;
91 /** Stream from the Socket for reading */
92 private InputStream fromMonet;
93 /** Stream from the Socket for writing */
94 private OutputStream toMonet;
95 /** MCLReader on the InputStream */
96 private BufferedMCLReader reader;
97 /** MCLWriter on the OutputStream */
98 private BufferedMCLWriter writer;
99 /** protocol version of the connection */
100 private int version;
101
102 /** The database to connect to */
103 private String database = null;
104 /** The language to connect with */
105 private String language = "sql";
106 /** The hash methods to use (null = default) */
107 private String hash = null;
108
109 /** Whether we should follow redirects */
110 private boolean followRedirects = true;
111 /** How many redirections do we follow until we're fed up with it? */
112 private int ttl = 10;
113
114 /** Whether we are debugging or not */
115 private boolean debug = false;
116 /** The Writer for the debug log-file */
117 private Writer log;
118
119 /** The blocksize (hardcoded in compliance with MonetDB common/stream/stream.h) */
120 public final static int BLOCK = 8 * 1024 - 2;
121
122 /** A short in two bytes for holding the block size in bytes */
123 private final byte[] blklen = new byte[2];
124
125 /**
126 * Constructs a new MapiSocket.
127 */
128 public MapiSocket() {
129 con = null;
130 }
131
132 /**
133 * Sets the database to connect to. If database is null, a
134 * connection is made to the default database of the server. This
135 * is also the default.
136 *
137 * @param db the database
138 */
139 public void setDatabase(final String db) {
140 this.database = db;
141 }
142
143 /**
144 * Sets the language to use for this connection.
145 *
146 * @param lang the language
147 */
148 public void setLanguage(final String lang) {
149 this.language = lang;
150 }
151
152 /**
153 * Sets the hash method to use. Note that this method is intended
154 * for debugging purposes. Setting a hash method can yield in
155 * connection failures. Multiple hash methods can be given by
156 * separating the hashes by commas.
157 * DON'T USE THIS METHOD if you don't know what you're doing.
158 *
159 * @param hash the hash method to use
160 */
161 public void setHash(final String hash) {
162 this.hash = hash;
163 }
164
165 /**
166 * Sets whether MCL redirections should be followed or not. If set
167 * to false, an MCLException will be thrown when a redirect is
168 * encountered during connect. The default bahaviour is to
169 * automatically follow redirects.
170 *
171 * @param r whether to follow redirects (true) or not (false)
172 */
173 public void setFollowRedirects(final boolean r) {
174 this.followRedirects = r;
175 }
176
177 /**
178 * Sets the number of redirects that are followed when
179 * followRedirects is true. In order to avoid going into an endless
180 * loop due to some evil server, or another error, a maximum number
181 * of redirects that may be followed can be set here. Note that to
182 * disable the following of redirects you should use
183 * setFollowRedirects.
184 *
185 * @see #setFollowRedirects(boolean r)
186 * @param t the number of redirects before an exception is thrown
187 */
188 public void setTTL(final int t) {
189 this.ttl = t;
190 }
191
192 /**
193 * Set the SO_TIMEOUT on the underlying Socket. When for some
194 * reason the connection to the database hangs, this setting can be
195 * useful to break out of this indefinite wait.
196 * This option must be enabled prior to entering the blocking
197 * operation to have effect.
198 *
199 * @param s The specified timeout, in milliseconds. A timeout
200 * of zero will disable timeout (i.e., timeout of infinity).
201 * @throws SocketException Issue with the socket
202 */
203 public void setSoTimeout(final int s) throws SocketException {
204 if (s < 0) {
205 throw new IllegalArgumentException("timeout can't be negative");
206 }
207 this.soTimeout = s;
208 // limit time to wait on blocking operations
209 if (con != null) {
210 con.setSoTimeout(s);
211 }
212 }
213
214 /**
215 * Gets the SO_TIMEOUT from the underlying Socket.
216 *
217 * @return the currently in use timeout in milliseconds
218 * @throws SocketException Issue with the socket
219 */
220 public int getSoTimeout() throws SocketException {
221 if (con != null) {
222 this.soTimeout = con.getSoTimeout();
223 }
224 return this.soTimeout;
225 }
226
227 /**
228 * Enables/disables debug mode with logging to file
229 *
230 * @param debug Value to set
231 */
232 public void setDebug(final boolean debug) {
233 this.debug = debug;
234 }
235
236 /**
237 * Connects to the given host and port, logging in as the given
238 * user. If followRedirect is false, a RedirectionException is
239 * thrown when a redirect is encountered.
240 *
241 * @param host the hostname, or null for the loopback address
242 * @param port the port number (must be between 0 and 65535, inclusive)
243 * @param user the username
244 * @param pass the password
245 * @return A List with informational (warning) messages. If this
246 * list is empty; then there are no warnings.
247 * @throws IOException if an I/O error occurs when creating the socket
248 * @throws SocketException - if there is an error in the underlying protocol, such as a TCP error.
249 * @throws UnknownHostException if the IP address of the host could not be determined
250 * @throws MCLParseException if bogus data is received
251 * @throws MCLException if an MCL related error occurs
252 */
253 public List<String> connect(final String host, final int port, final String user, final String pass)
254 throws IOException, SocketException, UnknownHostException, MCLParseException, MCLException
255 {
256 // Wrap around the internal connect that needs to know if it
257 // should really make a TCP connection or not.
258 return connect(host, port, user, pass, true);
259 }
260
261 private List<String> connect(final String host, final int port, final String user, final String pass, final boolean makeConnection)
262 throws IOException, SocketException, UnknownHostException, MCLParseException, MCLException
263 {
264 if (ttl-- <= 0)
265 throw new MCLException("Maximum number of redirects reached, aborting connection attempt.");
266
267 if (makeConnection) {
268 con = new Socket(host, port);
269 con.setSoTimeout(this.soTimeout);
270 // set nodelay, as it greatly speeds up small messages (like we often do)
271 con.setTcpNoDelay(true);
272 con.setKeepAlive(true);
273
274 fromMonet = new BlockInputStream(con.getInputStream());
275 toMonet = new BlockOutputStream(con.getOutputStream());
276 try {
277 reader = new BufferedMCLReader(fromMonet, "UTF-8");
278 writer = new BufferedMCLWriter(toMonet, "UTF-8");
279 writer.registerReader(reader);
280 } catch (UnsupportedEncodingException e) {
281 throw new MCLException(e.toString());
282 }
283 }
284
285 final String c = reader.readLine();
286 reader.waitForPrompt();
287 writer.writeLine(getChallengeResponse(c, user, pass, language, database, hash));
288
289 // read monetdb mserver response till prompt
290 final ArrayList<String> redirects = new ArrayList<String>();
291 final List<String> warns = new ArrayList<String>();
292 String err = "", tmp;
293 int lineType;
294 do {
295 tmp = reader.readLine();
296 if (tmp == null)
297 throw new IOException("Read from " +
298 con.getInetAddress().getHostName() + ":" +
299 con.getPort() + ": End of stream reached");
300 lineType = reader.getLineType();
301 if (lineType == BufferedMCLReader.ERROR) {
302 err += "\n" + tmp.substring(7);
303 } else if (lineType == BufferedMCLReader.INFO) {
304 warns.add(tmp.substring(1));
305 } else if (lineType == BufferedMCLReader.REDIRECT) {
306 redirects.add(tmp.substring(1));
307 }
308 } while (lineType != BufferedMCLReader.PROMPT);
309
310 if (err.length() > 0) {
311 close();
312 throw new MCLException(err);
313 }
314
315 if (!redirects.isEmpty()) {
316 if (followRedirects) {
317 // Ok, server wants us to go somewhere else. The list
318 // might have multiple clues on where to go. For now we
319 // don't support anything intelligent but trying the
320 // first one. URI should be in form of:
321 // "mapi:monetdb://host:port/database?arg=value&..."
322 // or
323 // "mapi:merovingian://proxy?arg=value&..."
324 // note that the extra arguments must be obeyed in both
325 // cases
326 final String suri = redirects.get(0).toString();
327 if (!suri.startsWith("mapi:"))
328 throw new MCLException("unsupported redirect: " + suri);
329
330 final URI u;
331 try {
332 u = new URI(suri.substring(5));
333 } catch (java.net.URISyntaxException e) {
334 throw new MCLParseException(e.toString());
335 }
336
337 tmp = u.getQuery();
338 if (tmp != null) {
339 final String args[] = tmp.split("&");
340 for (int i = 0; i < args.length; i++) {
341 int pos = args[i].indexOf("=");
342 if (pos > 0) {
343 tmp = args[i].substring(0, pos);
344 if (tmp.equals("database")) {
345 tmp = args[i].substring(pos + 1);
346 if (!tmp.equals(database)) {
347 warns.add("redirect points to different database: " + tmp);
348 setDatabase(tmp);
349 }
350 } else if (tmp.equals("language")) {
351 tmp = args[i].substring(pos + 1);
352 warns.add("redirect specifies use of different language: " + tmp);
353 setLanguage(tmp);
354 } else if (tmp.equals("user")) {
355 tmp = args[i].substring(pos + 1);
356 if (!tmp.equals(user))
357 warns.add("ignoring different username '" + tmp + "' set by " +
358 "redirect, what are the security implications?");
359 } else if (tmp.equals("password")) {
360 warns.add("ignoring different password set by redirect, " +
361 "what are the security implications?");
362 } else {
363 warns.add("ignoring unknown argument '" + tmp + "' from redirect");
364 }
365 } else {
366 warns.add("ignoring illegal argument from redirect: " + args[i]);
367 }
368 }
369 }
370
371 if (u.getScheme().equals("monetdb")) {
372 // this is a redirect to another (monetdb) server,
373 // which means a full reconnect
374 // avoid the debug log being closed
375 if (debug) {
376 debug = false;
377 close();
378 debug = true;
379 } else {
380 close();
381 }
382 tmp = u.getPath();
383 if (tmp != null && tmp.length() > 0) {
384 tmp = tmp.substring(1).trim();
385 if (!tmp.isEmpty() && !tmp.equals(database)) {
386 warns.add("redirect points to different database: " + tmp);
387 setDatabase(tmp);
388 }
389 }
390 final int p = u.getPort();
391 warns.addAll(connect(u.getHost(), p == -1 ? port : p, user, pass, true));
392 warns.add("Redirect by " + host + ":" + port + " to " + suri);
393 } else if (u.getScheme().equals("merovingian")) {
394 // reuse this connection to inline connect to the
395 // right database that Merovingian proxies for us
396 warns.addAll(connect(host, port, user, pass, false));
397 } else {
398 throw new MCLException("unsupported scheme in redirect: " + suri);
399 }
400 } else {
401 final StringBuilder msg = new StringBuilder("The server sent a redirect for this connection:");
402 for (String it : redirects) {
403 msg.append(" [" + it + "]");
404 }
405 throw new MCLException(msg.toString());
406 }
407 }
408 return warns;
409 }
410
411 /**
412 * A little helper function that processes a challenge string, and
413 * returns a response string for the server. If the challenge
414 * string is null, a challengeless response is returned.
415 *
416 * @param chalstr the challenge string
417 * for example: H8sRMhtevGd:mserver:9:PROT10,RIPEMD160,SHA256,SHA1,COMPRESSION_SNAPPY,COMPRESSION_LZ4:LIT:SHA512:
418 * @param username the username to use
419 * @param password the password to use
420 * @param language the language to use
421 * @param database the database to connect to
422 * @param hash the hash method(s) to use, or NULL for all supported hashes
423 */
424 private String getChallengeResponse(
425 final String chalstr,
426 String username,
427 String password,
428 final String language,
429 final String database,
430 final String hash
431 ) throws MCLParseException, MCLException, IOException {
432 // parse the challenge string, split it on ':'
433 final String[] chaltok = chalstr.split(":");
434 if (chaltok.length <= 5)
435 throw new MCLParseException("Server challenge string unusable! It contains too few (" + chaltok.length + ") tokens: " + chalstr);
436
437 try {
438 version = Integer.parseInt(chaltok[2]); // protocol version
439 } catch (NumberFormatException e) {
440 throw new MCLParseException("Protocol version (" + chaltok[2] + ") unparseable as integer.");
441 }
442
443 // handle the challenge according to the version it is
444 switch (version) {
445 case 9:
446 // proto 9 is like 8, but uses a hash instead of the plain password
447 // the server tells us (in 6th token) which hash in the
448 // challenge after the byte-order token
449
450 String algo;
451 String pwhash = chaltok[5];
452 /* NOTE: Java doesn't support RIPEMD160 :( */
453 /* see: https://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#MessageDigest */
454 if (pwhash.equals("SHA512")) {
455 algo = "SHA-512";
456 } else if (pwhash.equals("SHA384")) {
457 algo = "SHA-384";
458 } else if (pwhash.equals("SHA256")) {
459 algo = "SHA-256";
460 /* NOTE: Java 7 doesn't support SHA-224. Java 8 does but we have not tested it. It is also not requested yet. */
461 } else if (pwhash.equals("SHA1")) {
462 algo = "SHA-1";
463 } else {
464 /* Note: MD5 has been deprecated by security experts and support is removed from Oct 2020 release */
465 throw new MCLException("Unsupported password hash: " + pwhash);
466 }
467 try {
468 final MessageDigest md = MessageDigest.getInstance(algo);
469 md.update(password.getBytes("UTF-8"));
470 password = toHex(md.digest());
471 } catch (NoSuchAlgorithmException e) {
472 throw new MCLException("This JVM does not support password hash: " + pwhash + "\n" + e.toString());
473 } catch (UnsupportedEncodingException e) {
474 throw new MCLException("This JVM does not support UTF-8 encoding\n" + e.toString());
475 }
476
477 // proto 7 (finally) used the challenge and works with a
478 // password hash. The supported implementations come
479 // from the server challenge. We chose the best hash
480 // we can find, in the order SHA512, SHA1, MD5, plain.
481 // Also the byte-order is reported in the challenge string,
482 // which makes sense, since only blockmode is supported.
483 // proto 8 made this obsolete, but retained the
484 // byte-order report for future "binary" transports.
485 // In proto 8, the byte-order of the blocks is always little
486 // endian because most machines today are.
487 final String hashes = (hash == null || hash.isEmpty()) ? chaltok[3] : hash;
488 final HashSet<String> hashesSet = new HashSet<String>(java.util.Arrays.asList(hashes.toUpperCase().split("[, ]"))); // split on comma or space
489
490 // if we deal with merovingian, mask our credentials
491 if (chaltok[1].equals("merovingian") && !language.equals("control")) {
492 username = "merovingian";
493 password = "merovingian";
494 }
495
496 // reuse variables algo and pwhash
497 algo = null;
498 pwhash = null;
499 if (hashesSet.contains("SHA512")) {
500 algo = "SHA-512";
501 pwhash = "{SHA512}";
502 } else if (hashesSet.contains("SHA384")) {
503 algo = "SHA-384";
504 pwhash = "{SHA384}";
505 } else if (hashesSet.contains("SHA256")) {
506 algo = "SHA-256";
507 pwhash = "{SHA256}";
508 } else if (hashesSet.contains("SHA1")) {
509 algo = "SHA-1";
510 pwhash = "{SHA1}";
511 } else {
512 /* Note: MD5 has been deprecated by security experts and support is removed from Oct 2020 release */
513 throw new MCLException("no supported hash algorithms found in " + hashes);
514 }
515 try {
516 final MessageDigest md = MessageDigest.getInstance(algo);
517 md.update(password.getBytes("UTF-8"));
518 md.update(chaltok[0].getBytes("UTF-8")); // salt/key
519 pwhash += toHex(md.digest());
520 } catch (NoSuchAlgorithmException e) {
521 throw new MCLException("This JVM does not support password hash: " + pwhash + "\n" + e.toString());
522 } catch (UnsupportedEncodingException e) {
523 throw new MCLException("This JVM does not support UTF-8 encoding\n" + e.toString());
524 }
525
526 // TODO: some day when we need this, we should store this
527 if (chaltok[4].equals("BIG")) {
528 // byte-order of server is big-endian
529 } else if (chaltok[4].equals("LIT")) {
530 // byte-order of server is little-endian
531 } else {
532 throw new MCLParseException("Invalid byte-order: " + chaltok[4]);
533 }
534
535 // compose and return response
536 return "BIG:" // JVM byte-order is big-endian
537 + username + ":"
538 + pwhash + ":"
539 + language + ":"
540 + (database == null ? "" : database) + ":";
541 default:
542 throw new MCLException("Unsupported protocol version: " + version);
543 }
544 }
545
546 /**
547 * Small helper method to convert a byte string to a hexadecimal
548 * string representation.
549 *
550 * @param digest the byte array to convert
551 * @return the byte array as hexadecimal string
552 */
553 private final static String toHex(final byte[] digest) {
554 final char[] result = new char[digest.length * 2];
555 int pos = 0;
556 for (int i = 0; i < digest.length; i++) {
557 result[pos++] = hexChar((digest[i] & 0xf0) >> 4);
558 result[pos++] = hexChar(digest[i] & 0x0f);
559 }
560 return new String(result);
561 }
562
563 private final static char hexChar(final int n) {
564 return (n > 9)
565 ? (char) ('a' + (n - 10))
566 : (char) ('0' + n);
567 }
568
569
570 /**
571 * Returns an InputStream that reads from this open connection on
572 * the MapiSocket.
573 *
574 * @return an input stream that reads from this open connection
575 */
576 public InputStream getInputStream() {
577 return fromMonet;
578 }
579
580 /**
581 * Returns an output stream for this MapiSocket.
582 *
583 * @return an output stream for writing bytes to this MapiSocket
584 */
585 public OutputStream getOutputStream() {
586 return toMonet;
587 }
588
589 /**
590 * Returns a Reader for this MapiSocket. The Reader is a
591 * BufferedMCLReader which does protocol interpretation of the
592 * BlockInputStream produced by this MapiSocket.
593 *
594 * @return a BufferedMCLReader connected to this MapiSocket
595 */
596 public BufferedMCLReader getReader() {
597 return reader;
598 }
599
600 /**
601 * Returns a Writer for this MapiSocket. The Writer is a
602 * BufferedMCLWriter which produces protocol compatible data blocks
603 * that the BlockOutputStream can properly translate into blocks.
604 *
605 * @return a BufferedMCLWriter connected to this MapiSocket
606 */
607 public BufferedMCLWriter getWriter() {
608 return writer;
609 }
610
611 /**
612 * Returns the mapi protocol version used by this socket. The
613 * protocol version depends on the server being used. Users of the
614 * MapiSocket should check this version to act appropriately.
615 *
616 * @return the mapi protocol version
617 */
618 public int getProtocolVersion() {
619 return version;
620 }
621
622 /**
623 * Enables logging to a file what is read and written from and to
624 * the server. Logging can be enabled at any time. However, it is
625 * encouraged to start debugging before actually connecting the
626 * socket.
627 *
628 * @param filename the name of the file to write to
629 * @throws IOException if the file could not be opened for writing
630 */
631 public void debug(final String filename) throws IOException {
632 debug(new FileWriter(filename));
633 }
634
635 /**
636 * Enables logging to a stream what is read and written from and to
637 * the server. Logging can be enabled at any time. However, it is
638 * encouraged to start debugging before actually connecting the
639 * socket.
640 *
641 * @param out to write the log to a print stream
642 * @throws IOException if the file could not be opened for writing
643 */
644 // disabled as it is not used by JDBC driver code
645 // public void debug(PrintStream out) throws IOException {
646 // debug(new PrintWriter(out));
647 // }
648
649 /**
650 * Enables logging to a stream what is read and written from and to
651 * the server. Logging can be enabled at any time. However, it is
652 * encouraged to start debugging before actually connecting the
653 * socket.
654 *
655 * @param out to write the log to
656 */
657 public void debug(final Writer out) {
658 log = out;
659 debug = true;
660 }
661
662 /**
663 * Get the log Writer.
664 *
665 * @return the log writer
666 */
667 public Writer getLogWriter() {
668 return log;
669 }
670
671 /**
672 * Writes a logline tagged with a timestamp using the given type and message
673 * and optionally flushes afterwards.
674 *
675 * Used for debugging purposes only and represents a message data that is
676 * connected to reading (RD or RX) or writing (TD or TX) to the socket.
677 * R=Receive, T=Transmit, D=Data, X=??
678 *
679 * @param type message type: either RD, RX, TD or TX
680 * @param message the message to log
681 * @param flush whether we need to flush buffered data to the logfile.
682 * @throws IOException if an IO error occurs while writing to the logfile
683 */
684 private final void log(final String type, final String message, final boolean flush) throws IOException {
685 log.write(type + System.currentTimeMillis() + ": " + message + "\n");
686 if (flush)
687 log.flush();
688 }
689
690 /**
691 * Inner class that is used to write data on a normal stream as a
692 * blocked stream. A call to the flush() method will write a
693 * "final" block to the underlying stream. Non-final blocks are
694 * written as soon as one or more bytes would not fit in the
695 * current block any more. This allows to write to a block to it's
696 * full size, and then flush it explicitly to have a final block
697 * being written to the stream.
698 */
699 final class BlockOutputStream extends FilterOutputStream {
700 private int writePos = 0;
701 private int blocksize = 0;
702 private final byte[] block = new byte[BLOCK];
703
704 /**
705 * Constructs this BlockOutputStream, backed by the given
706 * OutputStream. A BufferedOutputStream is internally used.
707 */
708 public BlockOutputStream(final OutputStream out) {
709 // always use a buffered stream, even though we know how
710 // much bytes to write/read, since this is just faster for
711 // some reason
712 super(new BufferedOutputStream(out));
713 }
714
715 @Override
716 public void flush() throws IOException {
717 // write the block (as final) then flush.
718 writeBlock(true);
719 out.flush();
720
721 // it's a bit nasty if an exception is thrown from the log,
722 // but ignoring it can be nasty as well, so it is decided to
723 // let it go so there is feedback about something going wrong
724 // it's a bit nasty if an exception is thrown from the log,
725 // but ignoring it can be nasty as well, so it is decided to
726 // let it go so there is feedback about something going wrong
727 if (debug) {
728 log.flush();
729 }
730 }
731
732 /**
733 * writeBlock puts the data in the block on the stream. The
734 * boolean last controls whether the block is sent with an
735 * indicator to note it is the last block of a sequence or not.
736 *
737 * @param last whether this is the last block
738 * @throws IOException if writing to the stream failed
739 */
740 public void writeBlock(final boolean last) throws IOException {
741 if (last) {
742 // always fits, because of BLOCK's size
743 blocksize = (short)writePos;
744 // this is the last block, so encode least
745 // significant bit in the first byte (little-endian)
746 blklen[0] = (byte)(blocksize << 1 & 0xFF | 1);
747 blklen[1] = (byte)(blocksize >> 7);
748 } else {
749 // always fits, because of BLOCK's size
750 blocksize = (short)BLOCK;
751 // another block will follow, encode least
752 // significant bit in the first byte (little-endian)
753 blklen[0] = (byte)(blocksize << 1 & 0xFF);
754 blklen[1] = (byte)(blocksize >> 7);
755 }
756
757 out.write(blklen);
758 // write the actual block
759 out.write(block, 0, writePos);
760
761 if (debug) {
762 if (last) {
763 log("TD ", "write final block: " + writePos + " bytes", false);
764 } else {
765 log("TD ", "write block: " + writePos + " bytes", false);
766 }
767 log("TX ", new String(block, 0, writePos, "UTF-8"), true);
768 }
769
770 writePos = 0;
771 }
772
773 @Override
774 public void write(final int b) throws IOException {
775 if (writePos == BLOCK) {
776 writeBlock(false);
777 }
778 block[writePos++] = (byte)b;
779 }
780
781 @Override
782 public void write(final byte[] b) throws IOException {
783 write(b, 0, b.length);
784 }
785
786 @Override
787 public void write(final byte[] b, int off, int len) throws IOException {
788 int t = 0;
789 while (len > 0) {
790 t = BLOCK - writePos;
791 if (len > t) {
792 System.arraycopy(b, off, block, writePos, t);
793 off += t;
794 len -= t;
795 writePos += t;
796 writeBlock(false);
797 } else {
798 System.arraycopy(b, off, block, writePos, len);
799 writePos += len;
800 break;
801 }
802 }
803 }
804
805 @Override
806 public void close() throws IOException {
807 // we don't want the flush() method to be called (default of
808 // the FilterOutputStream), so we close manually here
809 out.close();
810 }
811 }
812
813
814 /**
815 * Inner class that is used to make the data on the blocked stream
816 * available as a normal stream.
817 */
818 final class BlockInputStream extends FilterInputStream {
819 private int readPos = 0;
820 private int blockLen = 0;
821 private final byte[] block = new byte[BLOCK + 3]; // \n.\n
822
823 /**
824 * Constructs this BlockInputStream, backed by the given
825 * InputStream. A BufferedInputStream is internally used.
826 */
827 public BlockInputStream(final InputStream in) {
828 // always use a buffered stream, even though we know how
829 // much bytes to write/read, since this is just faster for
830 // some reason
831 super(new BufferedInputStream(in));
832 }
833
834 @Override
835 public int available() {
836 return blockLen - readPos;
837 }
838
839 @Override
840 public boolean markSupported() {
841 return false;
842 }
843
844 @Override
845 public void mark(final int readlimit) {
846 throw new AssertionError("Not implemented!");
847 }
848
849 @Override
850 public void reset() {
851 throw new AssertionError("Not implemented!");
852 }
853
854 /**
855 * Small wrapper to get a blocking variant of the read() method
856 * on the BufferedInputStream. We want to benefit from the
857 * Buffered pre-fetching, but not dealing with half blocks.
858 * Changing this class to be able to use the partially received
859 * data will greatly complicate matters, while a performance
860 * improvement is debatable given the relatively small size of
861 * our blocks. Maybe it does speed up on slower links, then
862 * consider this method a quick bug fix/workaround.
863 *
864 * @return false if reading the block failed due to EOF
865 */
866 private boolean _read(final byte[] b, int len) throws IOException {
867 int s;
868 int off = 0;
869 while (len > 0) {
870 s = in.read(b, off, len);
871 if (s == -1) {
872 // if we have read something before, we should have been
873 // able to read the whole, so make this fatal
874 if (off > 0) {
875 if (debug) {
876 log("RD ", "the following incomplete block was received:", false);
877 log("RX ", new String(b, 0, off, "UTF-8"), true);
878 }
879 throw new IOException("Read from " +
880 con.getInetAddress().getHostName() + ":" +
881 con.getPort() + ": Incomplete block read from stream");
882 }
883 if (debug)
884 log("RD ", "server closed the connection (EOF)", true);
885 return false;
886 }
887 len -= s;
888 off += s;
889 }
890
891 return true;
892 }
893
894 /**
895 * Reads the next block on the stream into the internal buffer,
896 * or writes the prompt in the buffer.
897 *
898 * The blocked stream protocol consists of first a two byte
899 * integer indicating the length of the block, then the
900 * block, followed by another length + block. The end of
901 * such sequence is put in the last bit of the length, and
902 * hence this length should be shifted to the right to
903 * obtain the real length value first. We simply fetch
904 * blocks here as soon as they are needed for the stream's
905 * read methods.
906 *
907 * The user-flush, which is an implicit effect of the end of
908 * a block sequence, is communicated beyond the stream by
909 * inserting a prompt sequence on the stream after the last
910 * block. This method makes sure that a final block ends with a
911 * newline, if it doesn't already, in order to facilitate a
912 * Reader that is possibly chained to this InputStream.
913 *
914 * If the stream is not positioned correctly, hell will break
915 * loose.
916 */
917 private int readBlock() throws IOException {
918 // read next two bytes (short)
919 if (!_read(blklen, 2))
920 return(-1);
921
922 // Get the short-value and store its value in blockLen.
923 blockLen = (short)(
924 (blklen[0] & 0xFF) >> 1 |
925 (blklen[1] & 0xFF) << 7
926 );
927 readPos = 0;
928
929 if (debug) {
930 if ((blklen[0] & 0x1) == 1) {
931 log("RD ", "read final block: " + blockLen + " bytes", false);
932 } else {
933 log("RD ", "read new block: " + blockLen + " bytes", false);
934 }
935 }
936
937 // sanity check to avoid bad servers make us do an ugly
938 // stack trace
939 if (blockLen > block.length)
940 throw new IOException("Server sent a block larger than BLOCKsize: " +
941 blockLen + " > " + block.length);
942 if (!_read(block, blockLen))
943 return -1;
944
945 if (debug)
946 log("RX ", new String(block, 0, blockLen, "UTF-8"), true);
947
948 // if this is the last block, make it end with a newline and prompt
949 if ((blklen[0] & 0x1) == 1) {
950 if (blockLen > 0 && block[blockLen - 1] != '\n') {
951 // to terminate the block in a Reader
952 block[blockLen++] = '\n';
953 }
954 // insert 'fake' flush
955 block[blockLen++] = BufferedMCLReader.PROMPT;
956 block[blockLen++] = '\n';
957 if (debug)
958 log("RD ", "inserting prompt", true);
959 }
960
961 return blockLen;
962 }
963
964 @Override
965 public int read() throws IOException {
966 if (available() == 0) {
967 if (readBlock() == -1)
968 return -1;
969 }
970
971 if (debug)
972 log("RX ", new String(block, readPos, 1, "UTF-8"), true);
973
974 return (int)block[readPos++];
975 }
976
977 @Override
978 public int read(final byte[] b) throws IOException {
979 return read(b, 0, b.length);
980 }
981
982 @Override
983 public int read(final byte[] b, int off, int len) throws IOException {
984 int t;
985 int size = 0;
986 while (size < len) {
987 t = available();
988 if (t == 0) {
989 if (size != 0)
990 break;
991 if (readBlock() == -1) {
992 if (size == 0)
993 size = -1;
994 break;
995 }
996 t = available();
997 }
998 if (len > t) {
999 System.arraycopy(block, readPos, b, off, t);
1000 off += t;
1001 len -= t;
1002 readPos += t;
1003 size += t;
1004 } else {
1005 System.arraycopy(block, readPos, b, off, len);
1006 readPos += len;
1007 size += len;
1008 break;
1009 }
1010 }
1011 return size;
1012 }
1013
1014 @Override
1015 public long skip(final long n) throws IOException {
1016 long skip = n;
1017 int t = 0;
1018 while (skip > 0) {
1019 t = available();
1020 if (skip > t) {
1021 skip -= t;
1022 readPos += t;
1023 readBlock();
1024 } else {
1025 readPos += skip;
1026 break;
1027 }
1028 }
1029 return n;
1030 }
1031 }
1032
1033 /**
1034 * Closes the streams and socket connected to the server if possible.
1035 * If an error occurs at closing a resource, it is ignored so as many
1036 * resources as possible are closed.
1037 */
1038 public synchronized void close() {
1039 if (writer != null) {
1040 try {
1041 writer.close();
1042 writer = null;
1043 } catch (IOException e) { /* ignore it */ }
1044 }
1045 if (reader != null) {
1046 try {
1047 reader.close();
1048 reader = null;
1049 } catch (IOException e) { /* ignore it */ }
1050 }
1051 if (toMonet != null) {
1052 try {
1053 toMonet.close();
1054 toMonet = null;
1055 } catch (IOException e) { /* ignore it */ }
1056 }
1057 if (fromMonet != null) {
1058 try {
1059 fromMonet.close();
1060 fromMonet = null;
1061 } catch (IOException e) { /* ignore it */ }
1062 }
1063 if (con != null) {
1064 try {
1065 con.close(); // close the socket
1066 con = null;
1067 } catch (IOException e) { /* ignore it */ }
1068 }
1069 if (debug && log != null && log instanceof FileWriter) {
1070 try {
1071 log.close();
1072 log = null;
1073 } catch (IOException e) { /* ignore it */ }
1074 }
1075 }
1076
1077 /**
1078 * Destructor called by garbage collector before destroying this
1079 * object tries to disconnect the MonetDB connection if it has not
1080 * been disconnected already.
1081 *
1082 * @deprecated (since="9")
1083 */
1084 @Override
1085 @Deprecated
1086 protected void finalize() throws Throwable {
1087 close();
1088 super.finalize();
1089 }
1090 }