comparison src/main/java/nl/cwi/monetdb/mcl/net/MapiSocket.java @ 0:a5a898f6886c

Copy of MonetDB java directory changeset e6e32756ad31.
author Sjoerd Mullender <sjoerd@acm.org>
date Wed, 21 Sep 2016 09:34:48 +0200 (2016-09-21)
parents
children 7e0d71a22677 6cc63d6cb224
comparison
equal deleted inserted replaced
-1:000000000000 0:a5a898f6886c
1 /*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2016 MonetDB B.V.
7 */
8
9 package nl.cwi.monetdb.mcl.net;
10
11 import java.io.BufferedInputStream;
12 import java.io.BufferedOutputStream;
13 import java.io.EOFException;
14 import java.io.FileWriter;
15 import java.io.FilterInputStream;
16 import java.io.FilterOutputStream;
17 import java.io.IOException;
18 import java.io.InputStream;
19 import java.io.OutputStream;
20 import java.io.PrintStream;
21 import java.io.PrintWriter;
22 import java.io.UnsupportedEncodingException;
23 import java.io.Writer;
24 import java.net.Socket;
25 import java.net.SocketException;
26 import java.net.URI;
27 import java.net.URISyntaxException;
28 import java.security.MessageDigest;
29 import java.security.NoSuchAlgorithmException;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.HashSet;
33 import java.util.List;
34 import java.util.Set;
35
36 import nl.cwi.monetdb.mcl.MCLException;
37 import nl.cwi.monetdb.mcl.io.BufferedMCLReader;
38 import nl.cwi.monetdb.mcl.io.BufferedMCLWriter;
39 import nl.cwi.monetdb.mcl.parser.MCLParseException;
40
41 /**
42 * A Socket for communicating with the MonetDB database in MAPI block
43 * mode.
44 *
45 * The MapiSocket implements the protocol specifics of the MAPI block
46 * mode protocol, and interfaces it as a socket that delivers a
47 * BufferedReader and a BufferedWriter. Because logging in is an
48 * integral part of the MAPI protocol, the MapiSocket performs the login
49 * procedure. Like the Socket class, various options can be set before
50 * calling the connect() method to influence the login process. Only
51 * after a successful call to connect() the BufferedReader and
52 * BufferedWriter can be retrieved.
53 * <br />
54 * For each line read, it is determined what type of line it is
55 * according to the MonetDB MAPI protocol. This results in a line to be
56 * PROMPT, HEADER, RESULT, ERROR or UNKNOWN. Use the getLineType()
57 * method on the BufferedMCLReader to retrieve the type of the last
58 * line read.
59 *
60 * For debugging purposes a socket level debugging is implemented where
61 * each and every interaction to and from the MonetDB server is logged
62 * to a file on disk.<br />
63 * Incoming messages are prefixed by "RX" (received by the driver),
64 * outgoing messages by "TX" (transmitted by the driver). Special
65 * decoded non-human readable messages are prefixed with "RD" and "TD"
66 * instead. Following this two char prefix, a timestamp follows as the
67 * number of milliseconds since the UNIX epoch. The rest of the line is
68 * a String representation of the data sent or received.
69 *
70 * The general use of this Socket must be seen only in the full context
71 * of a MAPI connection to a server. It has the same ingredients as a
72 * normal Socket, allowing for seamless plugging.
73 * <pre>
74 * Socket \ / InputStream ----&gt; (BufferedMCL)Reader
75 * &gt; o &lt;
76 * MapiSocket / \ OutputStream ----&gt; (BufferedMCL)Writer
77 * </pre>
78 * The MapiSocket allows to retrieve Streams for communicating. They
79 * are interfaced, so they can be chained in any way. While the Socket
80 * transparently deals with how data is sent over the wire, the actual
81 * data read needs to be interpreted, for which a Reader/Writer
82 * interface is most sufficient. In particular the BufferedMCL*
83 * implementations of those interfaces supply some extra functionality
84 * geared towards the format of the data.
85 *
86 * @author Fabian Groffen
87 * @version 4.1
88 * @see nl.cwi.monetdb.mcl.io.BufferedMCLReader
89 * @see nl.cwi.monetdb.mcl.io.BufferedMCLWriter
90 */
91 public final class MapiSocket {
92 /** The TCP Socket to mserver */
93 private Socket con;
94 /** Stream from the Socket for reading */
95 private InputStream fromMonet;
96 /** Stream from the Socket for writing */
97 private OutputStream toMonet;
98 /** MCLReader on the InputStream */
99 private BufferedMCLReader reader;
100 /** MCLWriter on the OutputStream */
101 private BufferedMCLWriter writer;
102 /** protocol version of the connection */
103 private int version;
104
105 /** The database to connect to */
106 private String database = null;
107 /** The language to connect with */
108 private String language = "sql";
109 /** The hash methods to use (null = default) */
110 private String hash = null;
111 /** Whether we should follow redirects */
112 private boolean followRedirects = true;
113 /** How many redirections do we follow until we're fed up with it? */
114 private int ttl = 10;
115 /** Whether we are debugging or not */
116 private boolean debug = false;
117 /** The Writer for the debug log-file */
118 private Writer log;
119
120 /** The blocksize (hardcoded in compliance with stream.mx) */
121 public final static int BLOCK = 8 * 1024 - 2;
122
123 /** A short in two bytes for holding the block size in bytes */
124 private byte[] blklen = new byte[2];
125
126 /**
127 * Constructs a new MapiSocket.
128 */
129 public MapiSocket() {
130 con = null;
131 }
132
133 /**
134 * Sets the database to connect to. If database is null, a
135 * connection is made to the default database of the server. This
136 * is also the default.
137 *
138 * @param db the database
139 */
140 public void setDatabase(String db) {
141 this.database = db;
142 }
143
144 /**
145 * Sets the language to use for this connection.
146 *
147 * @param lang the language
148 */
149 public void setLanguage(String lang) {
150 this.language = lang;
151 }
152
153 /**
154 * Sets the hash method to use. Note that this method is intended
155 * for debugging purposes. Setting a hash method can yield in
156 * connection failures. Multiple hash methods can be given by
157 * separating the hashes by commas.
158 * DON'T USE THIS METHOD if you don't know what you're doing.
159 *
160 * @param hash the hash method to use
161 */
162 public void setHash(String hash) {
163 this.hash = hash;
164 }
165
166 /**
167 * Sets whether MCL redirections should be followed or not. If set
168 * to false, an MCLException will be thrown when a redirect is
169 * encountered during connect. The default bahaviour is to
170 * automatically follow redirects.
171 *
172 * @param r whether to follow redirects (true) or not (false)
173 */
174 public void setFollowRedirects(boolean r) {
175 this.followRedirects = r;
176 }
177
178 /**
179 * Sets the number of redirects that are followed when
180 * followRedirects is true. In order to avoid going into an endless
181 * loop due to some evil server, or another error, a maximum number
182 * of redirects that may be followed can be set here. Note that to
183 * disable the following of redirects you should use
184 * setFollowRedirects.
185 *
186 * @see #setFollowRedirects(boolean r)
187 * @param t the number of redirects before an exception is thrown
188 */
189 public void setTTL(int t) {
190 this.ttl = t;
191 }
192
193 /**
194 * Set the SO_TIMEOUT on the underlying Socket. When for some
195 * reason the connection to the database hangs, this setting can be
196 * useful to break out of this indefinite wait.
197 * This option must be enabled prior to entering the blocking
198 * operation to have effect.
199 *
200 * @param s The specified timeout, in milliseconds. A timeout
201 * of zero is interpreted as an infinite timeout.
202 * @throws SocketException Issue with the socket
203 */
204 public void setSoTimeout(int s) throws SocketException {
205 // limit time to wait on blocking operations (0 = indefinite)
206 con.setSoTimeout(s);
207 }
208
209 /**
210 * Gets the SO_TIMEOUT from the underlying Socket.
211 *
212 * @return the currently in use timeout in milliseconds
213 * @throws SocketException Issue with the socket
214 */
215 public int getSoTimeout() throws SocketException {
216 return con.getSoTimeout();
217 }
218
219 /**
220 * Enables/disables debug
221 *
222 * @param debug Value to set
223 */
224 public void setDebug(boolean debug) {
225 this.debug = debug;
226 }
227
228 /**
229 * Connects to the given host and port, logging in as the given
230 * user. If followRedirect is false, a RedirectionException is
231 * thrown when a redirect is encountered.
232 *
233 * @param host the hostname, or null for the loopback address
234 * @param port the port number
235 * @param user the username
236 * @param pass the password
237 * @return A List with informational (warning) messages. If this
238 * list is empty; then there are no warnings.
239 * @throws IOException if an I/O error occurs when creating the
240 * socket
241 * @throws MCLParseException if bogus data is received
242 * @throws MCLException if an MCL related error occurs
243 */
244 public List<String> connect(String host, int port, String user, String pass)
245 throws IOException, MCLParseException, MCLException {
246 // Wrap around the internal connect that needs to know if it
247 // should really make a TCP connection or not.
248 return connect(host, port, user, pass, true);
249 }
250
251 private List<String> connect(String host, int port, String user, String pass,
252 boolean makeConnection)
253 throws IOException, MCLParseException, MCLException {
254 if (ttl-- <= 0)
255 throw new MCLException("Maximum number of redirects reached, aborting connection attempt. Sorry.");
256
257 if (makeConnection) {
258 con = new Socket(host, port);
259 // set nodelay, as it greatly speeds up small messages (like we
260 // often do)
261 con.setTcpNoDelay(true);
262
263 fromMonet = new BlockInputStream(con.getInputStream());
264 toMonet = new BlockOutputStream(con.getOutputStream());
265 try {
266 reader = new BufferedMCLReader(fromMonet, "UTF-8");
267 writer = new BufferedMCLWriter(toMonet, "UTF-8");
268 } catch (UnsupportedEncodingException e) {
269 throw new AssertionError(e.toString());
270 }
271 writer.registerReader(reader);
272 }
273
274 String c = reader.readLine();
275 reader.waitForPrompt();
276 writer.writeLine(
277 getChallengeResponse(
278 c,
279 user,
280 pass,
281 language,
282 database,
283 hash
284 )
285 );
286
287 // read monet response till prompt
288 List<String> redirects = new ArrayList<String>();
289 List<String> warns = new ArrayList<String>();
290 String err = "", tmp;
291 int lineType;
292 do {
293 if ((tmp = reader.readLine()) == null)
294 throw new IOException("Read from " +
295 con.getInetAddress().getHostName() + ":" +
296 con.getPort() + ": End of stream reached");
297 if ((lineType = reader.getLineType()) == BufferedMCLReader.ERROR) {
298 err += "\n" + tmp.substring(7);
299 } else if (lineType == BufferedMCLReader.INFO) {
300 warns.add(tmp.substring(1));
301 } else if (lineType == BufferedMCLReader.REDIRECT) {
302 redirects.add(tmp.substring(1));
303 }
304 } while (lineType != BufferedMCLReader.PROMPT);
305 if (err != "") {
306 close();
307 throw new MCLException(err.trim());
308 }
309 if (!redirects.isEmpty()) {
310 if (followRedirects) {
311 // Ok, server wants us to go somewhere else. The list
312 // might have multiple clues on where to go. For now we
313 // don't support anything intelligent but trying the
314 // first one. URI should be in form of:
315 // "mapi:monetdb://host:port/database?arg=value&..."
316 // or
317 // "mapi:merovingian://proxy?arg=value&..."
318 // note that the extra arguments must be obeyed in both
319 // cases
320 String suri = redirects.get(0).toString();
321 if (!suri.startsWith("mapi:"))
322 throw new MCLException("unsupported redirect: " + suri);
323
324 URI u;
325 try {
326 u = new URI(suri.substring(5));
327 } catch (URISyntaxException e) {
328 throw new MCLParseException(e.toString());
329 }
330
331 tmp = u.getQuery();
332 if (tmp != null) {
333 String args[] = tmp.split("&");
334 for (int i = 0; i < args.length; i++) {
335 int pos = args[i].indexOf("=");
336 if (pos > 0) {
337 tmp = args[i].substring(0, pos);
338 if (tmp.equals("database")) {
339 tmp = args[i].substring(pos + 1);
340 if (!tmp.equals(database)) {
341 warns.add("redirect points to different " +
342 "database: " + tmp);
343 setDatabase(tmp);
344 }
345 } else if (tmp.equals("language")) {
346 tmp = args[i].substring(pos + 1);
347 warns.add("redirect specifies use of different language: " + tmp);
348 setLanguage(tmp);
349 } else if (tmp.equals("user")) {
350 tmp = args[i].substring(pos + 1);
351 if (!tmp.equals(user))
352 warns.add("ignoring different username '" + tmp + "' set by " +
353 "redirect, what are the security implications?");
354 } else if (tmp.equals("password")) {
355 warns.add("ignoring different password set by redirect, " +
356 "what are the security implications?");
357 } else {
358 warns.add("ignoring unknown argument '" + tmp + "' from redirect");
359 }
360 } else {
361 warns.add("ignoring illegal argument from redirect: " +
362 args[i]);
363 }
364 }
365 }
366
367 if (u.getScheme().equals("monetdb")) {
368 // this is a redirect to another (monetdb) server,
369 // which means a full reconnect
370 // avoid the debug log being closed
371 if (debug) {
372 debug = false;
373 close();
374 debug = true;
375 } else {
376 close();
377 }
378 tmp = u.getPath();
379 if (tmp != null && tmp.length() != 0) {
380 tmp = tmp.substring(1).trim();
381 if (!tmp.isEmpty() && !tmp.equals(database)) {
382 warns.add("redirect points to different " +
383 "database: " + tmp);
384 setDatabase(tmp);
385 }
386 }
387 int p = u.getPort();
388 warns.addAll(connect(u.getHost(), p == -1 ? port : p,
389 user, pass, true));
390 warns.add("Redirect by " + host + ":" + port + " to " + suri);
391 } else if (u.getScheme().equals("merovingian")) {
392 // reuse this connection to inline connect to the
393 // right database that Merovingian proxies for us
394 warns.addAll(connect(host, port, user, pass, false));
395 } else {
396 throw new MCLException("unsupported scheme in redirect: " + suri);
397 }
398 } else {
399 StringBuilder msg = new StringBuilder("The server sent a redirect for this connection:");
400 for (String it : redirects) {
401 msg.append(" [" + it + "]");
402 }
403 throw new MCLException(msg.toString());
404 }
405 }
406 return warns;
407 }
408
409 /**
410 * A little helper function that processes a challenge string, and
411 * returns a response string for the server. If the challenge
412 * string is null, a challengeless response is returned.
413 *
414 * @param chalstr the challenge string
415 * @param username the username to use
416 * @param password the password to use
417 * @param language the language to use
418 * @param database the database to connect to
419 * @param hash the hash method(s) to use, or NULL for all supported
420 * hashes
421 */
422 private String getChallengeResponse(
423 String chalstr,
424 String username,
425 String password,
426 String language,
427 String database,
428 String hash
429 ) throws MCLParseException, MCLException, IOException {
430 String response;
431 String algo;
432
433 // parse the challenge string, split it on ':'
434 String[] chaltok = chalstr.split(":");
435 if (chaltok.length <= 4) throw
436 new MCLParseException("Server challenge string unusable! Challenge contains too few tokens: " + chalstr);
437
438 // challenge string to use as salt/key
439 String challenge = chaltok[0];
440 String servert = chaltok[1];
441 try {
442 version = Integer.parseInt(chaltok[2].trim()); // protocol version
443 } catch (NumberFormatException e) {
444 throw new MCLParseException("Protocol version unparseable: " + chaltok[3]);
445 }
446
447 // handle the challenge according to the version it is
448 switch (version) {
449 default:
450 throw new MCLException("Unsupported protocol version: " + version);
451 case 9:
452 // proto 9 is like 8, but uses a hash instead of the
453 // plain password, the server tells us which hash in the
454 // challenge after the byte-order
455
456 /* NOTE: Java doesn't support RIPEMD160 :( */
457 if (chaltok[5].equals("SHA512")) {
458 algo = "SHA-512";
459 } else if (chaltok[5].equals("SHA384")) {
460 algo = "SHA-384";
461 } else if (chaltok[5].equals("SHA256")) {
462 algo = "SHA-256";
463 /* NOTE: Java doesn't support SHA-224 */
464 } else if (chaltok[5].equals("SHA1")) {
465 algo = "SHA-1";
466 } else if (chaltok[5].equals("MD5")) {
467 algo = "MD5";
468 } else {
469 throw new MCLException("Unsupported password hash: " +
470 chaltok[5]);
471 }
472
473 try {
474 MessageDigest md = MessageDigest.getInstance(algo);
475 md.update(password.getBytes("UTF-8"));
476 byte[] digest = md.digest();
477 password = toHex(digest);
478 } catch (NoSuchAlgorithmException e) {
479 throw new AssertionError("internal error: " + e.toString());
480 } catch (UnsupportedEncodingException e) {
481 throw new AssertionError("internal error: " + e.toString());
482 }
483
484 // proto 7 (finally) used the challenge and works with a
485 // password hash. The supported implementations come
486 // from the server challenge. We chose the best hash
487 // we can find, in the order SHA1, MD5, plain. Also,
488 // the byte-order is reported in the challenge string,
489 // which makes sense, since only blockmode is supported.
490 // proto 8 made this obsolete, but retained the
491 // byte-order report for future "binary" transports. In
492 // proto 8, the byte-order of the blocks is always little
493 // endian because most machines today are.
494 String hashes = (hash == null ? chaltok[3] : hash);
495 Set<String> hashesSet = new HashSet<String>(Arrays.asList(hashes.toUpperCase().split("[, ]")));
496
497 // if we deal with merovingian, mask our credentials
498 if (servert.equals("merovingian") && !language.equals("control")) {
499 username = "merovingian";
500 password = "merovingian";
501 }
502 String pwhash;
503 algo = null;
504
505 if (hashesSet.contains("SHA512")) {
506 algo = "SHA-512";
507 pwhash = "{SHA512}";
508 } else if (hashesSet.contains("SHA384")) {
509 algo = "SHA-384";
510 pwhash = "{SHA384}";
511 } else if (hashesSet.contains("SHA256")) {
512 algo = "SHA-256";
513 pwhash = "{SHA256}";
514 } else if (hashesSet.contains("SHA1")) {
515 algo = "SHA-1";
516 pwhash = "{SHA1}";
517 } else if (hashesSet.contains("MD5")) {
518 algo = "MD5";
519 pwhash = "{MD5}";
520 } else {
521 throw new MCLException("no supported password hashes in " + hashes);
522 }
523 if (algo != null) {
524 try {
525 MessageDigest md = MessageDigest.getInstance(algo);
526 md.update(password.getBytes("UTF-8"));
527 md.update(challenge.getBytes("UTF-8"));
528 byte[] digest = md.digest();
529 pwhash += toHex(digest);
530 } catch (NoSuchAlgorithmException e) {
531 throw new AssertionError("internal error: " + e.toString());
532 } catch (UnsupportedEncodingException e) {
533 throw new AssertionError("internal error: " + e.toString());
534 }
535 }
536 // TODO: some day when we need this, we should store
537 // this
538 if (chaltok[4].equals("BIG")) {
539 // byte-order of server is big-endian
540 } else if (chaltok[4].equals("LIT")) {
541 // byte-order of server is little-endian
542 } else {
543 throw new MCLParseException("Invalid byte-order: " + chaltok[5]);
544 }
545
546 // generate response
547 response = "BIG:"; // JVM byte-order is big-endian
548 response += username + ":" + pwhash + ":" + language;
549 response += ":" + (database == null ? "" : database) + ":";
550
551 return response;
552 }
553 }
554
555 private static char hexChar(int n) {
556 return (n > 9)
557 ? (char) ('a' + (n - 10))
558 : (char) ('0' + n);
559 }
560
561 /**
562 * Small helper method to convert a byte string to a hexadecimal
563 * string representation.
564 *
565 * @param digest the byte array to convert
566 * @return the byte array as hexadecimal string
567 */
568 private static String toHex(byte[] digest) {
569 char[] result = new char[digest.length * 2];
570 int pos = 0;
571 for (int i = 0; i < digest.length; i++) {
572 result[pos++] = hexChar((digest[i] & 0xf0) >> 4);
573 result[pos++] = hexChar(digest[i] & 0x0f);
574 }
575 return new String(result);
576 }
577
578 /**
579 * Returns an InputStream that reads from this open connection on
580 * the MapiSocket.
581 *
582 * @return an input stream that reads from this open connection
583 */
584 public InputStream getInputStream() {
585 return fromMonet;
586 }
587
588 /**
589 * Returns an output stream for this MapiSocket.
590 *
591 * @return an output stream for writing bytes to this MapiSocket
592 */
593 public OutputStream getOutputStream() {
594 return toMonet;
595 }
596
597 /**
598 * Returns a Reader for this MapiSocket. The Reader is a
599 * BufferedMCLReader which does protocol interpretation of the
600 * BlockInputStream produced by this MapiSocket.
601 *
602 * @return a BufferedMCLReader connected to this MapiSocket
603 */
604 public BufferedMCLReader getReader() {
605 return reader;
606 }
607
608 /**
609 * Returns a Writer for this MapiSocket. The Writer is a
610 * BufferedMCLWriter which produces protocol compatible data blocks
611 * that the BlockOutputStream can properly translate into blocks.
612 *
613 * @return a BufferedMCLWriter connected to this MapiSocket
614 */
615 public BufferedMCLWriter getWriter() {
616 return writer;
617 }
618
619 /**
620 * Returns the mapi protocol version used by this socket. The
621 * protocol version depends on the server being used. Users of the
622 * MapiSocket should check this version to act appropriately.
623 *
624 * @return the mapi protocol version
625 */
626 public int getProtocolVersion() {
627 return version;
628 }
629
630 /**
631 * Enables logging to a file what is read and written from and to
632 * the server. Logging can be enabled at any time. However, it is
633 * encouraged to start debugging before actually connecting the
634 * socket.
635 *
636 * @param filename the name of the file to write to
637 * @throws IOException if the file could not be opened for writing
638 */
639 public void debug(String filename) throws IOException {
640 debug(new FileWriter(filename));
641 }
642
643 /**
644 * Enables logging to a stream what is read and written from and to
645 * the server. Logging can be enabled at any time. However, it is
646 * encouraged to start debugging before actually connecting the
647 * socket.
648 *
649 * @param out to write the log to
650 * @throws IOException if the file could not be opened for writing
651 */
652 public void debug(PrintStream out) throws IOException {
653 debug(new PrintWriter(out));
654 }
655
656 /**
657 * Enables logging to a stream what is read and written from and to
658 * the server. Logging can be enabled at any time. However, it is
659 * encouraged to start debugging before actually connecting the
660 * socket.
661 *
662 * @param out to write the log to
663 * @throws IOException if the file could not be opened for writing
664 */
665 public void debug(Writer out) throws IOException {
666 log = out;
667 debug = true;
668 }
669
670 /**
671 * Inner class that is used to write data on a normal stream as a
672 * blocked stream. A call to the flush() method will write a
673 * "final" block to the underlying stream. Non-final blocks are
674 * written as soon as one or more bytes would not fit in the
675 * current block any more. This allows to write to a block to it's
676 * full size, and then flush it explicitly to have a final block
677 * being written to the stream.
678 */
679 class BlockOutputStream extends FilterOutputStream {
680 private int writePos = 0;
681 private byte[] block = new byte[BLOCK];
682 private int blocksize = 0;
683
684 /**
685 * Constructs this BlockOutputStream, backed by the given
686 * OutputStream. A BufferedOutputStream is internally used.
687 */
688 public BlockOutputStream(OutputStream out) {
689 // always use a buffered stream, even though we know how
690 // much bytes to write/read, since this is just faster for
691 // some reason
692 super(new BufferedOutputStream(out));
693 }
694
695 @Override
696 public void flush() throws IOException {
697 // write the block (as final) then flush.
698 writeBlock(true);
699 out.flush();
700
701 // it's a bit nasty if an exception is thrown from the log,
702 // but ignoring it can be nasty as well, so it is decided to
703 // let it go so there is feedback about something going wrong
704 // it's a bit nasty if an exception is thrown from the log,
705 // but ignoring it can be nasty as well, so it is decided to
706 // let it go so there is feedback about something going wrong
707 if (debug) {
708 log.flush();
709 }
710 }
711
712 /**
713 * writeBlock puts the data in the block on the stream. The
714 * boolean last controls whether the block is sent with an
715 * indicator to note it is the last block of a sequence or not.
716 *
717 * @param last whether this is the last block
718 * @throws IOException if writing to the stream failed
719 */
720 public void writeBlock(boolean last) throws IOException {
721 if (last) {
722 // always fits, because of BLOCK's size
723 blocksize = (short)writePos;
724 // this is the last block, so encode least
725 // significant bit in the first byte (little-endian)
726 blklen[0] = (byte)(blocksize << 1 & 0xFF | 1);
727 blklen[1] = (byte)(blocksize >> 7);
728 } else {
729 // always fits, because of BLOCK's size
730 blocksize = (short)BLOCK;
731 // another block will follow, encode least
732 // significant bit in the first byte (little-endian)
733 blklen[0] = (byte)(blocksize << 1 & 0xFF);
734 blklen[1] = (byte)(blocksize >> 7);
735 }
736
737 out.write(blklen);
738
739 // write the actual block
740 out.write(block, 0, writePos);
741
742 if (debug) {
743 if (last) {
744 logTd("write final block: " + writePos + " bytes");
745 } else {
746 logTd("write block: " + writePos + " bytes");
747 }
748 logTx(new String(block, 0, writePos, "UTF-8"));
749 }
750
751 writePos = 0;
752 }
753
754 @Override
755 public void write(int b) throws IOException {
756 if (writePos == BLOCK) {
757 writeBlock(false);
758 }
759 block[writePos++] = (byte)b;
760 }
761
762 @Override
763 public void write(byte[] b) throws IOException {
764 write(b, 0, b.length);
765 }
766
767 @Override
768 public void write(byte[] b, int off, int len) throws IOException {
769 int t = 0;
770 while (len > 0) {
771 t = BLOCK - writePos;
772 if (len > t) {
773 System.arraycopy(b, off, block, writePos, t);
774 off += t;
775 len -= t;
776 writePos += t;
777 writeBlock(false);
778 } else {
779 System.arraycopy(b, off, block, writePos, len);
780 writePos += len;
781 break;
782 }
783 }
784 }
785
786 @Override
787 public void close() throws IOException {
788 // we don't want the flush() method to be called (default of
789 // the FilterOutputStream), so we close manually here
790 out.close();
791 }
792 }
793
794
795 /**
796 * Inner class that is used to make the data on the blocked stream
797 * available as a normal stream.
798 */
799 class BlockInputStream extends FilterInputStream {
800 private int readPos = 0;
801 private int blockLen = 0;
802 private byte[] block = new byte[BLOCK + 3]; // \n.\n
803
804 /**
805 * Constructs this BlockInputStream, backed by the given
806 * InputStream. A BufferedInputStream is internally used.
807 */
808 public BlockInputStream(InputStream in) {
809 // always use a buffered stream, even though we know how
810 // much bytes to write/read, since this is just faster for
811 // some reason
812 super(new BufferedInputStream(in));
813 }
814
815 @Override
816 public int available() {
817 return blockLen - readPos;
818 }
819
820 @Override
821 public boolean markSupported() {
822 return false;
823 }
824
825 @Override
826 public void mark(int readlimit) {
827 throw new AssertionError("Not implemented!");
828 }
829
830 @Override
831 public void reset() {
832 throw new AssertionError("Not implemented!");
833 }
834
835 /**
836 * Small wrapper to get a blocking variant of the read() method
837 * on the BufferedInputStream. We want to benefit from the
838 * Buffered pre-fetching, but not dealing with half blocks.
839 * Changing this class to be able to use the partially received
840 * data will greatly complicate matters, while a performance
841 * improvement is debatable given the relatively small size of
842 * our blocks. Maybe it does speed up on slower links, then
843 * consider this method a quick bug fix/workaround.
844 *
845 * @return false if reading the block failed due to EOF
846 */
847 private boolean _read(byte[] b, int len) throws IOException {
848 int s;
849 int off = 0;
850
851 while (len > 0) {
852 s = in.read(b, off, len);
853 if (s == -1) {
854 // if we have read something before, we should have been
855 // able to read the whole, so make this fatal
856 if (off > 0) {
857 if (debug) {
858 logRd("the following incomplete block was received:");
859 logRx(new String(b, 0, off, "UTF-8"));
860 }
861 throw new IOException("Read from " +
862 con.getInetAddress().getHostName() + ":" +
863 con.getPort() + ": Incomplete block read from stream");
864 }
865 if (debug)
866 logRd("server closed the connection (EOF)");
867 return false;
868 }
869 len -= s;
870 off += s;
871 }
872
873 return true;
874 }
875
876 /**
877 * Reads the next block on the stream into the internal buffer,
878 * or writes the prompt in the buffer.
879 *
880 * The blocked stream protocol consists of first a two byte
881 * integer indicating the length of the block, then the
882 * block, followed by another length + block. The end of
883 * such sequence is put in the last bit of the length, and
884 * hence this length should be shifted to the right to
885 * obtain the real length value first. We simply fetch
886 * blocks here as soon as they are needed for the stream's
887 * read methods.
888 *
889 * The user-flush, which is an implicit effect of the end of
890 * a block sequence, is communicated beyond the stream by
891 * inserting a prompt sequence on the stream after the last
892 * block. This method makes sure that a final block ends with a
893 * newline, if it doesn't already, in order to facilitate a
894 * Reader that is possibly chained to this InputStream.
895 *
896 * If the stream is not positioned correctly, hell will break
897 * loose.
898 */
899 private int readBlock() throws IOException {
900 // read next two bytes (short)
901 if (!_read(blklen, 2))
902 return(-1);
903
904 // Get the short-value and store its value in blockLen.
905 blockLen = (short)(
906 (blklen[0] & 0xFF) >> 1 |
907 (blklen[1] & 0xFF) << 7
908 );
909 readPos = 0;
910
911 if (debug) {
912 if ((blklen[0] & 0x1) == 1) {
913 logRd("read final block: " + blockLen + " bytes");
914 } else {
915 logRd("read new block: " + blockLen + " bytes");
916 }
917 }
918
919 // sanity check to avoid bad servers make us do an ugly
920 // stack trace
921 if (blockLen > block.length)
922 throw new AssertionError("Server sent a block " +
923 "larger than BLOCKsize: " +
924 blockLen + " > " + block.length);
925 if (!_read(block, blockLen))
926 return(-1);
927
928 if (debug)
929 logRx(new String(block, 0, blockLen, "UTF-8"));
930
931 // if this is the last block, make it end with a newline and
932 // prompt
933 if ((blklen[0] & 0x1) == 1) {
934 if (blockLen > 0 && block[blockLen - 1] != '\n') {
935 // to terminate the block in a Reader
936 block[blockLen++] = '\n';
937 }
938 // insert 'fake' flush
939 block[blockLen++] = BufferedMCLReader.PROMPT;
940 block[blockLen++] = '\n';
941 if (debug)
942 logRd("inserting prompt");
943 }
944
945 return(blockLen);
946 }
947
948 @Override
949 public int read() throws IOException {
950 if (available() == 0) {
951 if (readBlock() == -1)
952 return(-1);
953 }
954
955 if (debug)
956 logRx(new String(block, readPos, 1, "UTF-8"));
957 return (int)block[readPos++];
958 }
959
960 @Override
961 public int read(byte[] b) throws IOException {
962 return read(b, 0, b.length);
963 }
964
965 @Override
966 public int read(byte[] b, int off, int len) throws IOException {
967 int t;
968 int size = 0;
969 while (size < len) {
970 t = available();
971 if (t == 0) {
972 if (size != 0)
973 break;
974 if (readBlock() == -1) {
975 if (size == 0)
976 size = -1;
977 break;
978 }
979 t = available();
980 }
981 if (len > t) {
982 System.arraycopy(block, readPos, b, off, t);
983 off += t;
984 len -= t;
985 readPos += t;
986 size += t;
987 } else {
988 System.arraycopy(block, readPos, b, off, len);
989 readPos += len;
990 size += len;
991 break;
992 }
993 }
994 return size;
995 }
996
997 @Override
998 public long skip(long n) throws IOException {
999 long skip = n;
1000 int t = 0;
1001 while (skip > 0) {
1002 t = available();
1003 if (skip > t) {
1004 skip -= t;
1005 readPos += t;
1006 readBlock();
1007 } else {
1008 readPos += skip;
1009 break;
1010 }
1011 }
1012 return n;
1013 }
1014 }
1015
1016 /**
1017 * Closes the streams and socket connected to the server if
1018 * possible. If an error occurs during disconnecting it is ignored.
1019 */
1020 public synchronized void close() {
1021 try {
1022 if (reader != null) reader.close();
1023 if (writer != null) writer.close();
1024 if (fromMonet != null) fromMonet.close();
1025 if (toMonet != null) toMonet.close();
1026 if (con != null) con.close();
1027 if (debug && log instanceof FileWriter) log.close();
1028 } catch (IOException e) {
1029 // ignore it
1030 }
1031 }
1032
1033 /**
1034 * Destructor called by garbage collector before destroying this
1035 * object tries to disconnect the MonetDB connection if it has not
1036 * been disconnected already.
1037 */
1038 @Override
1039 protected void finalize() throws Throwable {
1040 close();
1041 super.finalize();
1042 }
1043
1044
1045 /**
1046 * Writes a logline tagged with a timestamp using the given string.
1047 * Used for debugging purposes only and represents a message that is
1048 * connected to writing to the socket. A logline might look like:
1049 * TX 152545124: Hello MonetDB!
1050 *
1051 * @param message the message to log
1052 * @throws IOException if an IO error occurs while writing to the logfile
1053 */
1054 private void logTx(String message) throws IOException {
1055 log.write("TX " + System.currentTimeMillis() +
1056 ": " + message + "\n");
1057 }
1058
1059 /**
1060 * Writes a logline tagged with a timestamp using the given string.
1061 * Lines written using this log method are tagged as "added
1062 * metadata" which is not strictly part of the data sent.
1063 *
1064 * @param message the message to log
1065 * @throws IOException if an IO error occurs while writing to the logfile
1066 */
1067 private void logTd(String message) throws IOException {
1068 log.write("TD " + System.currentTimeMillis() +
1069 ": " + message + "\n");
1070 }
1071
1072 /**
1073 * Writes a logline tagged with a timestamp using the given string,
1074 * and flushes afterwards. Used for debugging purposes only and
1075 * represents a message that is connected to reading from the
1076 * socket. The log is flushed after writing the line. A logline
1077 * might look like:
1078 * RX 152545124: Hi JDBC!
1079 *
1080 * @param message the message to log
1081 * @throws IOException if an IO error occurs while writing to the logfile
1082 */
1083 private void logRx(String message) throws IOException {
1084 log.write("RX " + System.currentTimeMillis() +
1085 ": " + message + "\n");
1086 log.flush();
1087 }
1088
1089 /**
1090 * Writes a logline tagged with a timestamp using the given string,
1091 * and flushes afterwards. Lines written using this log method are
1092 * tagged as "added metadata" which is not strictly part of the data
1093 * received.
1094 *
1095 * @param message the message to log
1096 * @throws IOException if an IO error occurs while writing to the logfile
1097 */
1098 private void logRd(String message) throws IOException {
1099 log.write("RD " + System.currentTimeMillis() +
1100 ": " + message + "\n");
1101 log.flush();
1102 }
1103 }