Mercurial > hg > monetdb-java
changeset 616:65641a7cea31
Implement line ending conversion for downloads
MonetConnection.Download#getStream returns an InputStream which
converts line endings when in text mode.
The default line ending is the platform line ending but that can be
changed. Setting it to \n can be a useful optimization if you don't
need the \r's anyway.
author | Joeri van Ruth <joeri.van.ruth@monetdbsolutions.com> |
---|---|
date | Wed, 19 Jan 2022 14:58:01 +0100 (2022-01-19) |
parents | 34a15cd8cfc2 |
children | 15eb17b911a5 |
files | src/main/java/org/monetdb/jdbc/MonetConnection.java src/main/java/org/monetdb/mcl/net/MapiSocket.java tests/OnClientTester.java |
diffstat | 3 files changed, 143 insertions(+), 20 deletions(-) [+] |
line wrap: on
line diff
--- a/src/main/java/org/monetdb/jdbc/MonetConnection.java +++ b/src/main/java/org/monetdb/jdbc/MonetConnection.java @@ -3225,7 +3225,9 @@ public class MonetConnection } else if (transferCommand.startsWith("rb ")) { return handleUpload(transferCommand.substring(3), false, 0); } else if (transferCommand.startsWith("w ")) { - return handleDownload(transferCommand.substring(2)); + return handleDownload(transferCommand.substring(2), true); + } else if (transferCommand.startsWith("wb ")) { + return handleDownload(transferCommand.substring(2), false); } return "JDBC does not support this file transfer yet: " + transferCommand; } @@ -3250,12 +3252,12 @@ public class MonetConnection return handle.getError(); } - private String handleDownload(final String path) throws IOException { + private String handleDownload(final String path, boolean textMode) throws IOException { if (downloadHandler == null) { return "No file download handler has been registered with the JDBC driver"; } - final Download handle = new Download(server); + final Download handle = new Download(server, textMode); try { downloadHandler.handleDownload(handle, path, true); if (!handle.hasBeenUsed()) { @@ -3467,11 +3469,16 @@ public class MonetConnection */ public static class Download { private final MapiSocket server; + private boolean prependCr; private MapiSocket.DownloadStream stream = null; private String error = null; - Download(MapiSocket server) { + Download(MapiSocket server, boolean textMode) { this.server = server; + prependCr = false; + if (textMode) { + setLineSeparator(System.lineSeparator()); + } } /** @@ -3497,14 +3504,16 @@ public class MonetConnection /** * Get an {@link InputStream} to read data from. * - * Textual data is UTF-8 encoded. + * Textual data is UTF-8 encoded. If the download is in text mode, line endings + * are converted according to {@link java.lang.System#lineSeparator()}. + * This can be overridden with {@link Download#setLineSeparator(String)}. */ public InputStream getStream() throws IOException { if (error != null) { throw new IOException("cannot receive data after error has been sent"); } if (stream == null) { - stream = server.downloadStream(); + stream = server.downloadStream(prependCr); server.getOutputStream().flush(); } return stream; @@ -3568,6 +3577,21 @@ public class MonetConnection } } } + + /** + * Set the line endings used in the stream returned by {@link Download#getStream()} + * @param sep separator to use + * @throws IllegalArgumentException if sep is neither "\n" nor "\r\n" + */ + public void setLineSeparator(String sep) { + if ("\n".equals(sep)) { + prependCr = false; + } else if ("\r\n".equals(sep)) { + prependCr = true; + } else { + throw new IllegalArgumentException("sep must be \n or \r\n"); + } + } } public static class StripCrLfStream extends FilterOutputStream {
--- a/src/main/java/org/monetdb/mcl/net/MapiSocket.java +++ b/src/main/java/org/monetdb/mcl/net/MapiSocket.java @@ -1198,9 +1198,10 @@ public class MapiSocket { /* cannot (yet * Return a DownloadStream for use with for example COPY INTO filename ON CLIENT * * Building block for {@link org.monetdb.jdbc.MonetConnection.DownloadHandler}. + * @param prependCr convert \n to \r\n */ - public DownloadStream downloadStream() { - return new DownloadStream(fromMonet.getRaw(), toMonet); + public DownloadStream downloadStream(boolean prependCr) { + return new DownloadStream(fromMonet.getRaw(), toMonet, prependCr); } /** @@ -1379,12 +1380,15 @@ public class MapiSocket { /* cannot (yet public static class DownloadStream extends InputStream { private final BlockInputStream.Raw rawIn; private final OutputStream out; + private final boolean prependCr; private boolean endBlockSeen = false; private boolean closed = false; + private boolean newlinePending = false; // used for crlf conversion - DownloadStream(BlockInputStream.Raw rawIn, OutputStream out) { + DownloadStream(BlockInputStream.Raw rawIn, OutputStream out, boolean prependCr) { this.rawIn = rawIn; this.out = out; + this.prependCr = prependCr; } void nextBlock() throws IOException { @@ -1422,23 +1426,55 @@ public class MapiSocket { /* cannot (yet } @Override - public int read(final byte[] b, int off, int len) throws IOException { + public int read(final byte[] dest, int off, int len) throws IOException { final int origOff = off; - while (len > 0) { - int chunk = Integer.min(len, rawIn.getLength() - rawIn.getPosition()); - if (chunk > 0) { - // make progress copying some bytes - System.arraycopy(rawIn.getBytes(), rawIn.getPosition(), b, off, chunk); - off += chunk; - rawIn.consume(chunk); - len -= chunk; - } else { - // make progress fetching data + int end = off + len; + + while (off < end) { + // minimum of what's requested and what we have in stock + int chunk = Integer.min(end - off, rawIn.getLength() - rawIn.getPosition()); + assert chunk >= 0; + if (chunk == 0) { + // make progress by fetching more data if (endBlockSeen) break; nextBlock(); + continue; + } + // make progress copying some bytes + if (!prependCr) { + // no conversion needed, use arraycopy + System.arraycopy(rawIn.getBytes(), rawIn.getPosition(), dest, off, chunk); + off += chunk; + rawIn.consume(chunk); + } else { + int chunkEnd = off + chunk; + if (newlinePending && off < chunkEnd) { + // we were in the middle of a line ending conversion + dest[off++] = '\n'; + newlinePending = false; + } + while (off < chunkEnd) { + byte b = rawIn.getBytes()[rawIn.consume(1)]; + if (b != '\n') { + dest[off++] = b; + } else if (chunkEnd - off >= 2) { + dest[off++] = '\r'; + dest[off++] = '\n'; + } else { + dest[off++] = '\r'; + newlinePending = true; + break; + } + } } } + + if (off < end && newlinePending) { + dest[off++] = '\n'; + newlinePending = false; + } + if (off == origOff && endBlockSeen) return -1; else
--- a/tests/OnClientTester.java +++ b/tests/OnClientTester.java @@ -28,6 +28,7 @@ import java.sql.Statement; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; +import java.util.Random; import java.util.zip.GZIPOutputStream; import static java.nio.file.StandardOpenOption.CREATE_NEW; @@ -147,6 +148,8 @@ public final class OnClientTester { test_LargeUpload(); if (isSelected("LargeDownload")) test_LargeDownload(); + if (isSelected("DownloadCrLf")) + test_DownloadCrLf(); if (isSelected("UploadFromStream")) test_UploadFromStream(); if (isSelected("UploadFromReader")) @@ -602,6 +605,61 @@ public final class OnClientTester { exitTest(); } + private void test_DownloadCrLf() throws SQLException, Failure, IOException { + + // This tests forces line ending conversion and reads in small batches, hoping to trigger corner cases + + initTest("test_DownloadCrLf"); + prepare(); + update("ALTER TABLE foo DROP COLUMN t"); + update("ALTER TABLE foo ADD COLUMN j INT"); + update("INSERT INTO foo SELECT rand() % CASE WHEN value % 10 = 0 THEN 1000 ELSE 10 END AS i, 0 AS j FROM generate_series(0, 500000)"); + ByteArrayOutputStream target = new ByteArrayOutputStream(); + Random rng = new Random(42); + DownloadHandler handler = (handle, name, textMode) -> { + handle.setLineSeparator("\r\n"); + InputStream s = handle.getStream(); + byte[] buf = new byte[10]; + boolean expectEof = false; + for (;;) { + int n = rng.nextInt(buf.length - 1) + 1; + int nread = s.read(buf, 0, n); + if (nread < 0) { + break; + } + target.write(buf, 0, nread); + } + + }; + conn.setDownloadHandler(handler); + update("COPY SELECT * FROM foo INTO 'banana' ON CLIENT"); + // go to String instead of byte[] because Strings have handy replace methods. + String result = new String(target.toByteArray(), StandardCharsets.UTF_8); + + // It should contain only \r\n's, no lonely \r's or \n's. + String replaced = result.replaceAll("\r\n", "XX"); + + assertEq("Index of first lonely \\r", -1, replaced.indexOf('\r')); + assertEq("Index of first lonely \\n", -1, replaced.indexOf('\n')); + + String withoutData = result.replaceAll("[0-9]", ""); + Files.writeString(Path.of("/tmp/x.csv"), withoutData, StandardCharsets.UTF_8); + assertEq("Length after dropping data, modulo 3", 0, withoutData.length() % 3); + for (int i = 0; i < withoutData.length(); i += 3) { + String sub = withoutData.substring(i, i+3); + if (!sub.equals("|\r\n")) { + fail(String.format( + "At index %d out of %d in the skeleton (=digits removed) we find <%02x %02x %02x> instead of <7c 0d 0a>", + i, withoutData.length(), + (int)sub.charAt(0), (int)sub.charAt(1), (int)sub.charAt(2))); + } + } + // only to show some succesful output if the above succeeds + assertEq("Every 3-byte normalized chunk", "|\\r\\n", "|\\r\\n"); + + exitTest(); + } + private void test_UploadFromStream() throws SQLException, Failure { initTest("test_UploadFromStream"); prepare(); @@ -894,6 +952,11 @@ public final class OnClientTester { /* utility methods */ + private void say(String message) throws Failure { + outBuffer.append(message).append("\n"); + throw new Failure(message); + } + private void fail(String message) throws Failure { outBuffer.append("FAILURE: ").append(message).append("\n"); throw new Failure(message);