Mercurial > hg > monetdb-java
comparison src/main/java/nl/cwi/monetdb/mcl/net/MapiSocket.java @ 0:a5a898f6886c
Copy of MonetDB java directory changeset e6e32756ad31.
author | Sjoerd Mullender <sjoerd@acm.org> |
---|---|
date | Wed, 21 Sep 2016 09:34:48 +0200 (2016-09-21) |
parents | |
children | 7e0d71a22677 6cc63d6cb224 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:a5a898f6886c |
---|---|
1 /* | |
2 * This Source Code Form is subject to the terms of the Mozilla Public | |
3 * License, v. 2.0. If a copy of the MPL was not distributed with this | |
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. | |
5 * | |
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2016 MonetDB B.V. | |
7 */ | |
8 | |
9 package nl.cwi.monetdb.mcl.net; | |
10 | |
11 import java.io.BufferedInputStream; | |
12 import java.io.BufferedOutputStream; | |
13 import java.io.EOFException; | |
14 import java.io.FileWriter; | |
15 import java.io.FilterInputStream; | |
16 import java.io.FilterOutputStream; | |
17 import java.io.IOException; | |
18 import java.io.InputStream; | |
19 import java.io.OutputStream; | |
20 import java.io.PrintStream; | |
21 import java.io.PrintWriter; | |
22 import java.io.UnsupportedEncodingException; | |
23 import java.io.Writer; | |
24 import java.net.Socket; | |
25 import java.net.SocketException; | |
26 import java.net.URI; | |
27 import java.net.URISyntaxException; | |
28 import java.security.MessageDigest; | |
29 import java.security.NoSuchAlgorithmException; | |
30 import java.util.ArrayList; | |
31 import java.util.Arrays; | |
32 import java.util.HashSet; | |
33 import java.util.List; | |
34 import java.util.Set; | |
35 | |
36 import nl.cwi.monetdb.mcl.MCLException; | |
37 import nl.cwi.monetdb.mcl.io.BufferedMCLReader; | |
38 import nl.cwi.monetdb.mcl.io.BufferedMCLWriter; | |
39 import nl.cwi.monetdb.mcl.parser.MCLParseException; | |
40 | |
41 /** | |
42 * A Socket for communicating with the MonetDB database in MAPI block | |
43 * mode. | |
44 * | |
45 * The MapiSocket implements the protocol specifics of the MAPI block | |
46 * mode protocol, and interfaces it as a socket that delivers a | |
47 * BufferedReader and a BufferedWriter. Because logging in is an | |
48 * integral part of the MAPI protocol, the MapiSocket performs the login | |
49 * procedure. Like the Socket class, various options can be set before | |
50 * calling the connect() method to influence the login process. Only | |
51 * after a successful call to connect() the BufferedReader and | |
52 * BufferedWriter can be retrieved. | |
53 * <br /> | |
54 * For each line read, it is determined what type of line it is | |
55 * according to the MonetDB MAPI protocol. This results in a line to be | |
56 * PROMPT, HEADER, RESULT, ERROR or UNKNOWN. Use the getLineType() | |
57 * method on the BufferedMCLReader to retrieve the type of the last | |
58 * line read. | |
59 * | |
60 * For debugging purposes a socket level debugging is implemented where | |
61 * each and every interaction to and from the MonetDB server is logged | |
62 * to a file on disk.<br /> | |
63 * Incoming messages are prefixed by "RX" (received by the driver), | |
64 * outgoing messages by "TX" (transmitted by the driver). Special | |
65 * decoded non-human readable messages are prefixed with "RD" and "TD" | |
66 * instead. Following this two char prefix, a timestamp follows as the | |
67 * number of milliseconds since the UNIX epoch. The rest of the line is | |
68 * a String representation of the data sent or received. | |
69 * | |
70 * The general use of this Socket must be seen only in the full context | |
71 * of a MAPI connection to a server. It has the same ingredients as a | |
72 * normal Socket, allowing for seamless plugging. | |
73 * <pre> | |
74 * Socket \ / InputStream ----> (BufferedMCL)Reader | |
75 * > o < | |
76 * MapiSocket / \ OutputStream ----> (BufferedMCL)Writer | |
77 * </pre> | |
78 * The MapiSocket allows to retrieve Streams for communicating. They | |
79 * are interfaced, so they can be chained in any way. While the Socket | |
80 * transparently deals with how data is sent over the wire, the actual | |
81 * data read needs to be interpreted, for which a Reader/Writer | |
82 * interface is most sufficient. In particular the BufferedMCL* | |
83 * implementations of those interfaces supply some extra functionality | |
84 * geared towards the format of the data. | |
85 * | |
86 * @author Fabian Groffen | |
87 * @version 4.1 | |
88 * @see nl.cwi.monetdb.mcl.io.BufferedMCLReader | |
89 * @see nl.cwi.monetdb.mcl.io.BufferedMCLWriter | |
90 */ | |
91 public final class MapiSocket { | |
92 /** The TCP Socket to mserver */ | |
93 private Socket con; | |
94 /** Stream from the Socket for reading */ | |
95 private InputStream fromMonet; | |
96 /** Stream from the Socket for writing */ | |
97 private OutputStream toMonet; | |
98 /** MCLReader on the InputStream */ | |
99 private BufferedMCLReader reader; | |
100 /** MCLWriter on the OutputStream */ | |
101 private BufferedMCLWriter writer; | |
102 /** protocol version of the connection */ | |
103 private int version; | |
104 | |
105 /** The database to connect to */ | |
106 private String database = null; | |
107 /** The language to connect with */ | |
108 private String language = "sql"; | |
109 /** The hash methods to use (null = default) */ | |
110 private String hash = null; | |
111 /** Whether we should follow redirects */ | |
112 private boolean followRedirects = true; | |
113 /** How many redirections do we follow until we're fed up with it? */ | |
114 private int ttl = 10; | |
115 /** Whether we are debugging or not */ | |
116 private boolean debug = false; | |
117 /** The Writer for the debug log-file */ | |
118 private Writer log; | |
119 | |
120 /** The blocksize (hardcoded in compliance with stream.mx) */ | |
121 public final static int BLOCK = 8 * 1024 - 2; | |
122 | |
123 /** A short in two bytes for holding the block size in bytes */ | |
124 private byte[] blklen = new byte[2]; | |
125 | |
126 /** | |
127 * Constructs a new MapiSocket. | |
128 */ | |
129 public MapiSocket() { | |
130 con = null; | |
131 } | |
132 | |
133 /** | |
134 * Sets the database to connect to. If database is null, a | |
135 * connection is made to the default database of the server. This | |
136 * is also the default. | |
137 * | |
138 * @param db the database | |
139 */ | |
140 public void setDatabase(String db) { | |
141 this.database = db; | |
142 } | |
143 | |
144 /** | |
145 * Sets the language to use for this connection. | |
146 * | |
147 * @param lang the language | |
148 */ | |
149 public void setLanguage(String lang) { | |
150 this.language = lang; | |
151 } | |
152 | |
153 /** | |
154 * Sets the hash method to use. Note that this method is intended | |
155 * for debugging purposes. Setting a hash method can yield in | |
156 * connection failures. Multiple hash methods can be given by | |
157 * separating the hashes by commas. | |
158 * DON'T USE THIS METHOD if you don't know what you're doing. | |
159 * | |
160 * @param hash the hash method to use | |
161 */ | |
162 public void setHash(String hash) { | |
163 this.hash = hash; | |
164 } | |
165 | |
166 /** | |
167 * Sets whether MCL redirections should be followed or not. If set | |
168 * to false, an MCLException will be thrown when a redirect is | |
169 * encountered during connect. The default bahaviour is to | |
170 * automatically follow redirects. | |
171 * | |
172 * @param r whether to follow redirects (true) or not (false) | |
173 */ | |
174 public void setFollowRedirects(boolean r) { | |
175 this.followRedirects = r; | |
176 } | |
177 | |
178 /** | |
179 * Sets the number of redirects that are followed when | |
180 * followRedirects is true. In order to avoid going into an endless | |
181 * loop due to some evil server, or another error, a maximum number | |
182 * of redirects that may be followed can be set here. Note that to | |
183 * disable the following of redirects you should use | |
184 * setFollowRedirects. | |
185 * | |
186 * @see #setFollowRedirects(boolean r) | |
187 * @param t the number of redirects before an exception is thrown | |
188 */ | |
189 public void setTTL(int t) { | |
190 this.ttl = t; | |
191 } | |
192 | |
193 /** | |
194 * Set the SO_TIMEOUT on the underlying Socket. When for some | |
195 * reason the connection to the database hangs, this setting can be | |
196 * useful to break out of this indefinite wait. | |
197 * This option must be enabled prior to entering the blocking | |
198 * operation to have effect. | |
199 * | |
200 * @param s The specified timeout, in milliseconds. A timeout | |
201 * of zero is interpreted as an infinite timeout. | |
202 * @throws SocketException Issue with the socket | |
203 */ | |
204 public void setSoTimeout(int s) throws SocketException { | |
205 // limit time to wait on blocking operations (0 = indefinite) | |
206 con.setSoTimeout(s); | |
207 } | |
208 | |
209 /** | |
210 * Gets the SO_TIMEOUT from the underlying Socket. | |
211 * | |
212 * @return the currently in use timeout in milliseconds | |
213 * @throws SocketException Issue with the socket | |
214 */ | |
215 public int getSoTimeout() throws SocketException { | |
216 return con.getSoTimeout(); | |
217 } | |
218 | |
219 /** | |
220 * Enables/disables debug | |
221 * | |
222 * @param debug Value to set | |
223 */ | |
224 public void setDebug(boolean debug) { | |
225 this.debug = debug; | |
226 } | |
227 | |
228 /** | |
229 * Connects to the given host and port, logging in as the given | |
230 * user. If followRedirect is false, a RedirectionException is | |
231 * thrown when a redirect is encountered. | |
232 * | |
233 * @param host the hostname, or null for the loopback address | |
234 * @param port the port number | |
235 * @param user the username | |
236 * @param pass the password | |
237 * @return A List with informational (warning) messages. If this | |
238 * list is empty; then there are no warnings. | |
239 * @throws IOException if an I/O error occurs when creating the | |
240 * socket | |
241 * @throws MCLParseException if bogus data is received | |
242 * @throws MCLException if an MCL related error occurs | |
243 */ | |
244 public List<String> connect(String host, int port, String user, String pass) | |
245 throws IOException, MCLParseException, MCLException { | |
246 // Wrap around the internal connect that needs to know if it | |
247 // should really make a TCP connection or not. | |
248 return connect(host, port, user, pass, true); | |
249 } | |
250 | |
251 private List<String> connect(String host, int port, String user, String pass, | |
252 boolean makeConnection) | |
253 throws IOException, MCLParseException, MCLException { | |
254 if (ttl-- <= 0) | |
255 throw new MCLException("Maximum number of redirects reached, aborting connection attempt. Sorry."); | |
256 | |
257 if (makeConnection) { | |
258 con = new Socket(host, port); | |
259 // set nodelay, as it greatly speeds up small messages (like we | |
260 // often do) | |
261 con.setTcpNoDelay(true); | |
262 | |
263 fromMonet = new BlockInputStream(con.getInputStream()); | |
264 toMonet = new BlockOutputStream(con.getOutputStream()); | |
265 try { | |
266 reader = new BufferedMCLReader(fromMonet, "UTF-8"); | |
267 writer = new BufferedMCLWriter(toMonet, "UTF-8"); | |
268 } catch (UnsupportedEncodingException e) { | |
269 throw new AssertionError(e.toString()); | |
270 } | |
271 writer.registerReader(reader); | |
272 } | |
273 | |
274 String c = reader.readLine(); | |
275 reader.waitForPrompt(); | |
276 writer.writeLine( | |
277 getChallengeResponse( | |
278 c, | |
279 user, | |
280 pass, | |
281 language, | |
282 database, | |
283 hash | |
284 ) | |
285 ); | |
286 | |
287 // read monet response till prompt | |
288 List<String> redirects = new ArrayList<String>(); | |
289 List<String> warns = new ArrayList<String>(); | |
290 String err = "", tmp; | |
291 int lineType; | |
292 do { | |
293 if ((tmp = reader.readLine()) == null) | |
294 throw new IOException("Read from " + | |
295 con.getInetAddress().getHostName() + ":" + | |
296 con.getPort() + ": End of stream reached"); | |
297 if ((lineType = reader.getLineType()) == BufferedMCLReader.ERROR) { | |
298 err += "\n" + tmp.substring(7); | |
299 } else if (lineType == BufferedMCLReader.INFO) { | |
300 warns.add(tmp.substring(1)); | |
301 } else if (lineType == BufferedMCLReader.REDIRECT) { | |
302 redirects.add(tmp.substring(1)); | |
303 } | |
304 } while (lineType != BufferedMCLReader.PROMPT); | |
305 if (err != "") { | |
306 close(); | |
307 throw new MCLException(err.trim()); | |
308 } | |
309 if (!redirects.isEmpty()) { | |
310 if (followRedirects) { | |
311 // Ok, server wants us to go somewhere else. The list | |
312 // might have multiple clues on where to go. For now we | |
313 // don't support anything intelligent but trying the | |
314 // first one. URI should be in form of: | |
315 // "mapi:monetdb://host:port/database?arg=value&..." | |
316 // or | |
317 // "mapi:merovingian://proxy?arg=value&..." | |
318 // note that the extra arguments must be obeyed in both | |
319 // cases | |
320 String suri = redirects.get(0).toString(); | |
321 if (!suri.startsWith("mapi:")) | |
322 throw new MCLException("unsupported redirect: " + suri); | |
323 | |
324 URI u; | |
325 try { | |
326 u = new URI(suri.substring(5)); | |
327 } catch (URISyntaxException e) { | |
328 throw new MCLParseException(e.toString()); | |
329 } | |
330 | |
331 tmp = u.getQuery(); | |
332 if (tmp != null) { | |
333 String args[] = tmp.split("&"); | |
334 for (int i = 0; i < args.length; i++) { | |
335 int pos = args[i].indexOf("="); | |
336 if (pos > 0) { | |
337 tmp = args[i].substring(0, pos); | |
338 if (tmp.equals("database")) { | |
339 tmp = args[i].substring(pos + 1); | |
340 if (!tmp.equals(database)) { | |
341 warns.add("redirect points to different " + | |
342 "database: " + tmp); | |
343 setDatabase(tmp); | |
344 } | |
345 } else if (tmp.equals("language")) { | |
346 tmp = args[i].substring(pos + 1); | |
347 warns.add("redirect specifies use of different language: " + tmp); | |
348 setLanguage(tmp); | |
349 } else if (tmp.equals("user")) { | |
350 tmp = args[i].substring(pos + 1); | |
351 if (!tmp.equals(user)) | |
352 warns.add("ignoring different username '" + tmp + "' set by " + | |
353 "redirect, what are the security implications?"); | |
354 } else if (tmp.equals("password")) { | |
355 warns.add("ignoring different password set by redirect, " + | |
356 "what are the security implications?"); | |
357 } else { | |
358 warns.add("ignoring unknown argument '" + tmp + "' from redirect"); | |
359 } | |
360 } else { | |
361 warns.add("ignoring illegal argument from redirect: " + | |
362 args[i]); | |
363 } | |
364 } | |
365 } | |
366 | |
367 if (u.getScheme().equals("monetdb")) { | |
368 // this is a redirect to another (monetdb) server, | |
369 // which means a full reconnect | |
370 // avoid the debug log being closed | |
371 if (debug) { | |
372 debug = false; | |
373 close(); | |
374 debug = true; | |
375 } else { | |
376 close(); | |
377 } | |
378 tmp = u.getPath(); | |
379 if (tmp != null && tmp.length() != 0) { | |
380 tmp = tmp.substring(1).trim(); | |
381 if (!tmp.isEmpty() && !tmp.equals(database)) { | |
382 warns.add("redirect points to different " + | |
383 "database: " + tmp); | |
384 setDatabase(tmp); | |
385 } | |
386 } | |
387 int p = u.getPort(); | |
388 warns.addAll(connect(u.getHost(), p == -1 ? port : p, | |
389 user, pass, true)); | |
390 warns.add("Redirect by " + host + ":" + port + " to " + suri); | |
391 } else if (u.getScheme().equals("merovingian")) { | |
392 // reuse this connection to inline connect to the | |
393 // right database that Merovingian proxies for us | |
394 warns.addAll(connect(host, port, user, pass, false)); | |
395 } else { | |
396 throw new MCLException("unsupported scheme in redirect: " + suri); | |
397 } | |
398 } else { | |
399 StringBuilder msg = new StringBuilder("The server sent a redirect for this connection:"); | |
400 for (String it : redirects) { | |
401 msg.append(" [" + it + "]"); | |
402 } | |
403 throw new MCLException(msg.toString()); | |
404 } | |
405 } | |
406 return warns; | |
407 } | |
408 | |
409 /** | |
410 * A little helper function that processes a challenge string, and | |
411 * returns a response string for the server. If the challenge | |
412 * string is null, a challengeless response is returned. | |
413 * | |
414 * @param chalstr the challenge string | |
415 * @param username the username to use | |
416 * @param password the password to use | |
417 * @param language the language to use | |
418 * @param database the database to connect to | |
419 * @param hash the hash method(s) to use, or NULL for all supported | |
420 * hashes | |
421 */ | |
422 private String getChallengeResponse( | |
423 String chalstr, | |
424 String username, | |
425 String password, | |
426 String language, | |
427 String database, | |
428 String hash | |
429 ) throws MCLParseException, MCLException, IOException { | |
430 String response; | |
431 String algo; | |
432 | |
433 // parse the challenge string, split it on ':' | |
434 String[] chaltok = chalstr.split(":"); | |
435 if (chaltok.length <= 4) throw | |
436 new MCLParseException("Server challenge string unusable! Challenge contains too few tokens: " + chalstr); | |
437 | |
438 // challenge string to use as salt/key | |
439 String challenge = chaltok[0]; | |
440 String servert = chaltok[1]; | |
441 try { | |
442 version = Integer.parseInt(chaltok[2].trim()); // protocol version | |
443 } catch (NumberFormatException e) { | |
444 throw new MCLParseException("Protocol version unparseable: " + chaltok[3]); | |
445 } | |
446 | |
447 // handle the challenge according to the version it is | |
448 switch (version) { | |
449 default: | |
450 throw new MCLException("Unsupported protocol version: " + version); | |
451 case 9: | |
452 // proto 9 is like 8, but uses a hash instead of the | |
453 // plain password, the server tells us which hash in the | |
454 // challenge after the byte-order | |
455 | |
456 /* NOTE: Java doesn't support RIPEMD160 :( */ | |
457 if (chaltok[5].equals("SHA512")) { | |
458 algo = "SHA-512"; | |
459 } else if (chaltok[5].equals("SHA384")) { | |
460 algo = "SHA-384"; | |
461 } else if (chaltok[5].equals("SHA256")) { | |
462 algo = "SHA-256"; | |
463 /* NOTE: Java doesn't support SHA-224 */ | |
464 } else if (chaltok[5].equals("SHA1")) { | |
465 algo = "SHA-1"; | |
466 } else if (chaltok[5].equals("MD5")) { | |
467 algo = "MD5"; | |
468 } else { | |
469 throw new MCLException("Unsupported password hash: " + | |
470 chaltok[5]); | |
471 } | |
472 | |
473 try { | |
474 MessageDigest md = MessageDigest.getInstance(algo); | |
475 md.update(password.getBytes("UTF-8")); | |
476 byte[] digest = md.digest(); | |
477 password = toHex(digest); | |
478 } catch (NoSuchAlgorithmException e) { | |
479 throw new AssertionError("internal error: " + e.toString()); | |
480 } catch (UnsupportedEncodingException e) { | |
481 throw new AssertionError("internal error: " + e.toString()); | |
482 } | |
483 | |
484 // proto 7 (finally) used the challenge and works with a | |
485 // password hash. The supported implementations come | |
486 // from the server challenge. We chose the best hash | |
487 // we can find, in the order SHA1, MD5, plain. Also, | |
488 // the byte-order is reported in the challenge string, | |
489 // which makes sense, since only blockmode is supported. | |
490 // proto 8 made this obsolete, but retained the | |
491 // byte-order report for future "binary" transports. In | |
492 // proto 8, the byte-order of the blocks is always little | |
493 // endian because most machines today are. | |
494 String hashes = (hash == null ? chaltok[3] : hash); | |
495 Set<String> hashesSet = new HashSet<String>(Arrays.asList(hashes.toUpperCase().split("[, ]"))); | |
496 | |
497 // if we deal with merovingian, mask our credentials | |
498 if (servert.equals("merovingian") && !language.equals("control")) { | |
499 username = "merovingian"; | |
500 password = "merovingian"; | |
501 } | |
502 String pwhash; | |
503 algo = null; | |
504 | |
505 if (hashesSet.contains("SHA512")) { | |
506 algo = "SHA-512"; | |
507 pwhash = "{SHA512}"; | |
508 } else if (hashesSet.contains("SHA384")) { | |
509 algo = "SHA-384"; | |
510 pwhash = "{SHA384}"; | |
511 } else if (hashesSet.contains("SHA256")) { | |
512 algo = "SHA-256"; | |
513 pwhash = "{SHA256}"; | |
514 } else if (hashesSet.contains("SHA1")) { | |
515 algo = "SHA-1"; | |
516 pwhash = "{SHA1}"; | |
517 } else if (hashesSet.contains("MD5")) { | |
518 algo = "MD5"; | |
519 pwhash = "{MD5}"; | |
520 } else { | |
521 throw new MCLException("no supported password hashes in " + hashes); | |
522 } | |
523 if (algo != null) { | |
524 try { | |
525 MessageDigest md = MessageDigest.getInstance(algo); | |
526 md.update(password.getBytes("UTF-8")); | |
527 md.update(challenge.getBytes("UTF-8")); | |
528 byte[] digest = md.digest(); | |
529 pwhash += toHex(digest); | |
530 } catch (NoSuchAlgorithmException e) { | |
531 throw new AssertionError("internal error: " + e.toString()); | |
532 } catch (UnsupportedEncodingException e) { | |
533 throw new AssertionError("internal error: " + e.toString()); | |
534 } | |
535 } | |
536 // TODO: some day when we need this, we should store | |
537 // this | |
538 if (chaltok[4].equals("BIG")) { | |
539 // byte-order of server is big-endian | |
540 } else if (chaltok[4].equals("LIT")) { | |
541 // byte-order of server is little-endian | |
542 } else { | |
543 throw new MCLParseException("Invalid byte-order: " + chaltok[5]); | |
544 } | |
545 | |
546 // generate response | |
547 response = "BIG:"; // JVM byte-order is big-endian | |
548 response += username + ":" + pwhash + ":" + language; | |
549 response += ":" + (database == null ? "" : database) + ":"; | |
550 | |
551 return response; | |
552 } | |
553 } | |
554 | |
555 private static char hexChar(int n) { | |
556 return (n > 9) | |
557 ? (char) ('a' + (n - 10)) | |
558 : (char) ('0' + n); | |
559 } | |
560 | |
561 /** | |
562 * Small helper method to convert a byte string to a hexadecimal | |
563 * string representation. | |
564 * | |
565 * @param digest the byte array to convert | |
566 * @return the byte array as hexadecimal string | |
567 */ | |
568 private static String toHex(byte[] digest) { | |
569 char[] result = new char[digest.length * 2]; | |
570 int pos = 0; | |
571 for (int i = 0; i < digest.length; i++) { | |
572 result[pos++] = hexChar((digest[i] & 0xf0) >> 4); | |
573 result[pos++] = hexChar(digest[i] & 0x0f); | |
574 } | |
575 return new String(result); | |
576 } | |
577 | |
578 /** | |
579 * Returns an InputStream that reads from this open connection on | |
580 * the MapiSocket. | |
581 * | |
582 * @return an input stream that reads from this open connection | |
583 */ | |
584 public InputStream getInputStream() { | |
585 return fromMonet; | |
586 } | |
587 | |
588 /** | |
589 * Returns an output stream for this MapiSocket. | |
590 * | |
591 * @return an output stream for writing bytes to this MapiSocket | |
592 */ | |
593 public OutputStream getOutputStream() { | |
594 return toMonet; | |
595 } | |
596 | |
597 /** | |
598 * Returns a Reader for this MapiSocket. The Reader is a | |
599 * BufferedMCLReader which does protocol interpretation of the | |
600 * BlockInputStream produced by this MapiSocket. | |
601 * | |
602 * @return a BufferedMCLReader connected to this MapiSocket | |
603 */ | |
604 public BufferedMCLReader getReader() { | |
605 return reader; | |
606 } | |
607 | |
608 /** | |
609 * Returns a Writer for this MapiSocket. The Writer is a | |
610 * BufferedMCLWriter which produces protocol compatible data blocks | |
611 * that the BlockOutputStream can properly translate into blocks. | |
612 * | |
613 * @return a BufferedMCLWriter connected to this MapiSocket | |
614 */ | |
615 public BufferedMCLWriter getWriter() { | |
616 return writer; | |
617 } | |
618 | |
619 /** | |
620 * Returns the mapi protocol version used by this socket. The | |
621 * protocol version depends on the server being used. Users of the | |
622 * MapiSocket should check this version to act appropriately. | |
623 * | |
624 * @return the mapi protocol version | |
625 */ | |
626 public int getProtocolVersion() { | |
627 return version; | |
628 } | |
629 | |
630 /** | |
631 * Enables logging to a file what is read and written from and to | |
632 * the server. Logging can be enabled at any time. However, it is | |
633 * encouraged to start debugging before actually connecting the | |
634 * socket. | |
635 * | |
636 * @param filename the name of the file to write to | |
637 * @throws IOException if the file could not be opened for writing | |
638 */ | |
639 public void debug(String filename) throws IOException { | |
640 debug(new FileWriter(filename)); | |
641 } | |
642 | |
643 /** | |
644 * Enables logging to a stream what is read and written from and to | |
645 * the server. Logging can be enabled at any time. However, it is | |
646 * encouraged to start debugging before actually connecting the | |
647 * socket. | |
648 * | |
649 * @param out to write the log to | |
650 * @throws IOException if the file could not be opened for writing | |
651 */ | |
652 public void debug(PrintStream out) throws IOException { | |
653 debug(new PrintWriter(out)); | |
654 } | |
655 | |
656 /** | |
657 * Enables logging to a stream what is read and written from and to | |
658 * the server. Logging can be enabled at any time. However, it is | |
659 * encouraged to start debugging before actually connecting the | |
660 * socket. | |
661 * | |
662 * @param out to write the log to | |
663 * @throws IOException if the file could not be opened for writing | |
664 */ | |
665 public void debug(Writer out) throws IOException { | |
666 log = out; | |
667 debug = true; | |
668 } | |
669 | |
670 /** | |
671 * Inner class that is used to write data on a normal stream as a | |
672 * blocked stream. A call to the flush() method will write a | |
673 * "final" block to the underlying stream. Non-final blocks are | |
674 * written as soon as one or more bytes would not fit in the | |
675 * current block any more. This allows to write to a block to it's | |
676 * full size, and then flush it explicitly to have a final block | |
677 * being written to the stream. | |
678 */ | |
679 class BlockOutputStream extends FilterOutputStream { | |
680 private int writePos = 0; | |
681 private byte[] block = new byte[BLOCK]; | |
682 private int blocksize = 0; | |
683 | |
684 /** | |
685 * Constructs this BlockOutputStream, backed by the given | |
686 * OutputStream. A BufferedOutputStream is internally used. | |
687 */ | |
688 public BlockOutputStream(OutputStream out) { | |
689 // always use a buffered stream, even though we know how | |
690 // much bytes to write/read, since this is just faster for | |
691 // some reason | |
692 super(new BufferedOutputStream(out)); | |
693 } | |
694 | |
695 @Override | |
696 public void flush() throws IOException { | |
697 // write the block (as final) then flush. | |
698 writeBlock(true); | |
699 out.flush(); | |
700 | |
701 // it's a bit nasty if an exception is thrown from the log, | |
702 // but ignoring it can be nasty as well, so it is decided to | |
703 // let it go so there is feedback about something going wrong | |
704 // it's a bit nasty if an exception is thrown from the log, | |
705 // but ignoring it can be nasty as well, so it is decided to | |
706 // let it go so there is feedback about something going wrong | |
707 if (debug) { | |
708 log.flush(); | |
709 } | |
710 } | |
711 | |
712 /** | |
713 * writeBlock puts the data in the block on the stream. The | |
714 * boolean last controls whether the block is sent with an | |
715 * indicator to note it is the last block of a sequence or not. | |
716 * | |
717 * @param last whether this is the last block | |
718 * @throws IOException if writing to the stream failed | |
719 */ | |
720 public void writeBlock(boolean last) throws IOException { | |
721 if (last) { | |
722 // always fits, because of BLOCK's size | |
723 blocksize = (short)writePos; | |
724 // this is the last block, so encode least | |
725 // significant bit in the first byte (little-endian) | |
726 blklen[0] = (byte)(blocksize << 1 & 0xFF | 1); | |
727 blklen[1] = (byte)(blocksize >> 7); | |
728 } else { | |
729 // always fits, because of BLOCK's size | |
730 blocksize = (short)BLOCK; | |
731 // another block will follow, encode least | |
732 // significant bit in the first byte (little-endian) | |
733 blklen[0] = (byte)(blocksize << 1 & 0xFF); | |
734 blklen[1] = (byte)(blocksize >> 7); | |
735 } | |
736 | |
737 out.write(blklen); | |
738 | |
739 // write the actual block | |
740 out.write(block, 0, writePos); | |
741 | |
742 if (debug) { | |
743 if (last) { | |
744 logTd("write final block: " + writePos + " bytes"); | |
745 } else { | |
746 logTd("write block: " + writePos + " bytes"); | |
747 } | |
748 logTx(new String(block, 0, writePos, "UTF-8")); | |
749 } | |
750 | |
751 writePos = 0; | |
752 } | |
753 | |
754 @Override | |
755 public void write(int b) throws IOException { | |
756 if (writePos == BLOCK) { | |
757 writeBlock(false); | |
758 } | |
759 block[writePos++] = (byte)b; | |
760 } | |
761 | |
762 @Override | |
763 public void write(byte[] b) throws IOException { | |
764 write(b, 0, b.length); | |
765 } | |
766 | |
767 @Override | |
768 public void write(byte[] b, int off, int len) throws IOException { | |
769 int t = 0; | |
770 while (len > 0) { | |
771 t = BLOCK - writePos; | |
772 if (len > t) { | |
773 System.arraycopy(b, off, block, writePos, t); | |
774 off += t; | |
775 len -= t; | |
776 writePos += t; | |
777 writeBlock(false); | |
778 } else { | |
779 System.arraycopy(b, off, block, writePos, len); | |
780 writePos += len; | |
781 break; | |
782 } | |
783 } | |
784 } | |
785 | |
786 @Override | |
787 public void close() throws IOException { | |
788 // we don't want the flush() method to be called (default of | |
789 // the FilterOutputStream), so we close manually here | |
790 out.close(); | |
791 } | |
792 } | |
793 | |
794 | |
795 /** | |
796 * Inner class that is used to make the data on the blocked stream | |
797 * available as a normal stream. | |
798 */ | |
799 class BlockInputStream extends FilterInputStream { | |
800 private int readPos = 0; | |
801 private int blockLen = 0; | |
802 private byte[] block = new byte[BLOCK + 3]; // \n.\n | |
803 | |
804 /** | |
805 * Constructs this BlockInputStream, backed by the given | |
806 * InputStream. A BufferedInputStream is internally used. | |
807 */ | |
808 public BlockInputStream(InputStream in) { | |
809 // always use a buffered stream, even though we know how | |
810 // much bytes to write/read, since this is just faster for | |
811 // some reason | |
812 super(new BufferedInputStream(in)); | |
813 } | |
814 | |
815 @Override | |
816 public int available() { | |
817 return blockLen - readPos; | |
818 } | |
819 | |
820 @Override | |
821 public boolean markSupported() { | |
822 return false; | |
823 } | |
824 | |
825 @Override | |
826 public void mark(int readlimit) { | |
827 throw new AssertionError("Not implemented!"); | |
828 } | |
829 | |
830 @Override | |
831 public void reset() { | |
832 throw new AssertionError("Not implemented!"); | |
833 } | |
834 | |
835 /** | |
836 * Small wrapper to get a blocking variant of the read() method | |
837 * on the BufferedInputStream. We want to benefit from the | |
838 * Buffered pre-fetching, but not dealing with half blocks. | |
839 * Changing this class to be able to use the partially received | |
840 * data will greatly complicate matters, while a performance | |
841 * improvement is debatable given the relatively small size of | |
842 * our blocks. Maybe it does speed up on slower links, then | |
843 * consider this method a quick bug fix/workaround. | |
844 * | |
845 * @return false if reading the block failed due to EOF | |
846 */ | |
847 private boolean _read(byte[] b, int len) throws IOException { | |
848 int s; | |
849 int off = 0; | |
850 | |
851 while (len > 0) { | |
852 s = in.read(b, off, len); | |
853 if (s == -1) { | |
854 // if we have read something before, we should have been | |
855 // able to read the whole, so make this fatal | |
856 if (off > 0) { | |
857 if (debug) { | |
858 logRd("the following incomplete block was received:"); | |
859 logRx(new String(b, 0, off, "UTF-8")); | |
860 } | |
861 throw new IOException("Read from " + | |
862 con.getInetAddress().getHostName() + ":" + | |
863 con.getPort() + ": Incomplete block read from stream"); | |
864 } | |
865 if (debug) | |
866 logRd("server closed the connection (EOF)"); | |
867 return false; | |
868 } | |
869 len -= s; | |
870 off += s; | |
871 } | |
872 | |
873 return true; | |
874 } | |
875 | |
876 /** | |
877 * Reads the next block on the stream into the internal buffer, | |
878 * or writes the prompt in the buffer. | |
879 * | |
880 * The blocked stream protocol consists of first a two byte | |
881 * integer indicating the length of the block, then the | |
882 * block, followed by another length + block. The end of | |
883 * such sequence is put in the last bit of the length, and | |
884 * hence this length should be shifted to the right to | |
885 * obtain the real length value first. We simply fetch | |
886 * blocks here as soon as they are needed for the stream's | |
887 * read methods. | |
888 * | |
889 * The user-flush, which is an implicit effect of the end of | |
890 * a block sequence, is communicated beyond the stream by | |
891 * inserting a prompt sequence on the stream after the last | |
892 * block. This method makes sure that a final block ends with a | |
893 * newline, if it doesn't already, in order to facilitate a | |
894 * Reader that is possibly chained to this InputStream. | |
895 * | |
896 * If the stream is not positioned correctly, hell will break | |
897 * loose. | |
898 */ | |
899 private int readBlock() throws IOException { | |
900 // read next two bytes (short) | |
901 if (!_read(blklen, 2)) | |
902 return(-1); | |
903 | |
904 // Get the short-value and store its value in blockLen. | |
905 blockLen = (short)( | |
906 (blklen[0] & 0xFF) >> 1 | | |
907 (blklen[1] & 0xFF) << 7 | |
908 ); | |
909 readPos = 0; | |
910 | |
911 if (debug) { | |
912 if ((blklen[0] & 0x1) == 1) { | |
913 logRd("read final block: " + blockLen + " bytes"); | |
914 } else { | |
915 logRd("read new block: " + blockLen + " bytes"); | |
916 } | |
917 } | |
918 | |
919 // sanity check to avoid bad servers make us do an ugly | |
920 // stack trace | |
921 if (blockLen > block.length) | |
922 throw new AssertionError("Server sent a block " + | |
923 "larger than BLOCKsize: " + | |
924 blockLen + " > " + block.length); | |
925 if (!_read(block, blockLen)) | |
926 return(-1); | |
927 | |
928 if (debug) | |
929 logRx(new String(block, 0, blockLen, "UTF-8")); | |
930 | |
931 // if this is the last block, make it end with a newline and | |
932 // prompt | |
933 if ((blklen[0] & 0x1) == 1) { | |
934 if (blockLen > 0 && block[blockLen - 1] != '\n') { | |
935 // to terminate the block in a Reader | |
936 block[blockLen++] = '\n'; | |
937 } | |
938 // insert 'fake' flush | |
939 block[blockLen++] = BufferedMCLReader.PROMPT; | |
940 block[blockLen++] = '\n'; | |
941 if (debug) | |
942 logRd("inserting prompt"); | |
943 } | |
944 | |
945 return(blockLen); | |
946 } | |
947 | |
948 @Override | |
949 public int read() throws IOException { | |
950 if (available() == 0) { | |
951 if (readBlock() == -1) | |
952 return(-1); | |
953 } | |
954 | |
955 if (debug) | |
956 logRx(new String(block, readPos, 1, "UTF-8")); | |
957 return (int)block[readPos++]; | |
958 } | |
959 | |
960 @Override | |
961 public int read(byte[] b) throws IOException { | |
962 return read(b, 0, b.length); | |
963 } | |
964 | |
965 @Override | |
966 public int read(byte[] b, int off, int len) throws IOException { | |
967 int t; | |
968 int size = 0; | |
969 while (size < len) { | |
970 t = available(); | |
971 if (t == 0) { | |
972 if (size != 0) | |
973 break; | |
974 if (readBlock() == -1) { | |
975 if (size == 0) | |
976 size = -1; | |
977 break; | |
978 } | |
979 t = available(); | |
980 } | |
981 if (len > t) { | |
982 System.arraycopy(block, readPos, b, off, t); | |
983 off += t; | |
984 len -= t; | |
985 readPos += t; | |
986 size += t; | |
987 } else { | |
988 System.arraycopy(block, readPos, b, off, len); | |
989 readPos += len; | |
990 size += len; | |
991 break; | |
992 } | |
993 } | |
994 return size; | |
995 } | |
996 | |
997 @Override | |
998 public long skip(long n) throws IOException { | |
999 long skip = n; | |
1000 int t = 0; | |
1001 while (skip > 0) { | |
1002 t = available(); | |
1003 if (skip > t) { | |
1004 skip -= t; | |
1005 readPos += t; | |
1006 readBlock(); | |
1007 } else { | |
1008 readPos += skip; | |
1009 break; | |
1010 } | |
1011 } | |
1012 return n; | |
1013 } | |
1014 } | |
1015 | |
1016 /** | |
1017 * Closes the streams and socket connected to the server if | |
1018 * possible. If an error occurs during disconnecting it is ignored. | |
1019 */ | |
1020 public synchronized void close() { | |
1021 try { | |
1022 if (reader != null) reader.close(); | |
1023 if (writer != null) writer.close(); | |
1024 if (fromMonet != null) fromMonet.close(); | |
1025 if (toMonet != null) toMonet.close(); | |
1026 if (con != null) con.close(); | |
1027 if (debug && log instanceof FileWriter) log.close(); | |
1028 } catch (IOException e) { | |
1029 // ignore it | |
1030 } | |
1031 } | |
1032 | |
1033 /** | |
1034 * Destructor called by garbage collector before destroying this | |
1035 * object tries to disconnect the MonetDB connection if it has not | |
1036 * been disconnected already. | |
1037 */ | |
1038 @Override | |
1039 protected void finalize() throws Throwable { | |
1040 close(); | |
1041 super.finalize(); | |
1042 } | |
1043 | |
1044 | |
1045 /** | |
1046 * Writes a logline tagged with a timestamp using the given string. | |
1047 * Used for debugging purposes only and represents a message that is | |
1048 * connected to writing to the socket. A logline might look like: | |
1049 * TX 152545124: Hello MonetDB! | |
1050 * | |
1051 * @param message the message to log | |
1052 * @throws IOException if an IO error occurs while writing to the logfile | |
1053 */ | |
1054 private void logTx(String message) throws IOException { | |
1055 log.write("TX " + System.currentTimeMillis() + | |
1056 ": " + message + "\n"); | |
1057 } | |
1058 | |
1059 /** | |
1060 * Writes a logline tagged with a timestamp using the given string. | |
1061 * Lines written using this log method are tagged as "added | |
1062 * metadata" which is not strictly part of the data sent. | |
1063 * | |
1064 * @param message the message to log | |
1065 * @throws IOException if an IO error occurs while writing to the logfile | |
1066 */ | |
1067 private void logTd(String message) throws IOException { | |
1068 log.write("TD " + System.currentTimeMillis() + | |
1069 ": " + message + "\n"); | |
1070 } | |
1071 | |
1072 /** | |
1073 * Writes a logline tagged with a timestamp using the given string, | |
1074 * and flushes afterwards. Used for debugging purposes only and | |
1075 * represents a message that is connected to reading from the | |
1076 * socket. The log is flushed after writing the line. A logline | |
1077 * might look like: | |
1078 * RX 152545124: Hi JDBC! | |
1079 * | |
1080 * @param message the message to log | |
1081 * @throws IOException if an IO error occurs while writing to the logfile | |
1082 */ | |
1083 private void logRx(String message) throws IOException { | |
1084 log.write("RX " + System.currentTimeMillis() + | |
1085 ": " + message + "\n"); | |
1086 log.flush(); | |
1087 } | |
1088 | |
1089 /** | |
1090 * Writes a logline tagged with a timestamp using the given string, | |
1091 * and flushes afterwards. Lines written using this log method are | |
1092 * tagged as "added metadata" which is not strictly part of the data | |
1093 * received. | |
1094 * | |
1095 * @param message the message to log | |
1096 * @throws IOException if an IO error occurs while writing to the logfile | |
1097 */ | |
1098 private void logRd(String message) throws IOException { | |
1099 log.write("RD " + System.currentTimeMillis() + | |
1100 ": " + message + "\n"); | |
1101 log.flush(); | |
1102 } | |
1103 } |