Mercurial > hg > monetdb-java
changeset 613:9397c0b487f8
Normalize CRLF on upload
author | Joeri van Ruth <joeri.van.ruth@monetdbsolutions.com> |
---|---|
date | Tue, 18 Jan 2022 13:18:43 +0100 (2022-01-18) |
parents | 1d44b8a577ca |
children | 2eb21a7167f9 |
files | src/main/java/org/monetdb/jdbc/MonetConnection.java tests/OnClientTester.java |
diffstat | 2 files changed, 189 insertions(+), 3 deletions(-) [+] |
line wrap: on
line diff
--- a/src/main/java/org/monetdb/jdbc/MonetConnection.java +++ b/src/main/java/org/monetdb/jdbc/MonetConnection.java @@ -3236,7 +3236,7 @@ public class MonetConnection } final long linesToSkip = offset >= 1 ? offset - 1 : 0; - final Upload handle = new Upload(server, uploadHandler::uploadCancelled); + final Upload handle = new Upload(server, uploadHandler::uploadCancelled, textMode); final boolean wasFaking = server.setInsertFakePrompts(false); try { uploadHandler.handleUpload(handle, path, textMode, linesToSkip); @@ -3326,13 +3326,15 @@ public class MonetConnection public static class Upload { private final MapiSocket server; private final Runnable cancellationCallback; + private final boolean textMode; private PrintStream print = null; private String error = null; private int customChunkSize = -1; - Upload(MapiSocket server, Runnable cancellationCallback) { + Upload(MapiSocket server, Runnable cancellationCallback, boolean textMode) { this.server = server; this.cancellationCallback = cancellationCallback; + this.textMode = textMode; } /** @@ -3374,7 +3376,7 @@ public class MonetConnection try { final MapiSocket.UploadStream up = customChunkSize >= 0 ? server.uploadStream(customChunkSize) : server.uploadStream(); up.setCancellationCallback(cancellationCallback); - print = new PrintStream(up, false, "UTF-8"); + print = new PrintStream(textMode ? new StripCrLfStream(up) : up, false, "UTF-8"); up.write('\n'); } catch (UnsupportedEncodingException e) { throw new RuntimeException("The system is guaranteed to support the UTF-8 encoding but apparently it doesn't", e); @@ -3567,4 +3569,82 @@ public class MonetConnection } } } + + public static class StripCrLfStream extends FilterOutputStream { + private boolean crPending = false; + + public StripCrLfStream(OutputStream out) { + super(out); + } + + public boolean pending() { + return this.crPending; + } + + @Override + public void write(int b) throws IOException { + if (crPending && b != '\n') { + out.write('\r'); + } + if (b != '\r') { + out.write(b); + crPending = false; + } else { + crPending = true; + } + } + + @Override + public void write(byte[] b) throws IOException { + this.write(b, 0, b.length); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + if (len == 0) { + return; + } + if (crPending && b[0] != '\n') { + out.write('\r'); + } + + // deal with final \r up front + if (b[len - 1] == '\r') { + crPending = true; + len -= 1; + } else { + crPending = false; + } + + for (int i = off; i < off + len - 1; i++) { + if (b[i] == '\r' && b[i + 1] == '\n') { + int chunk = i - off; + out.write(b, off, chunk); + // chunk + 1 because we want to skip the \r + len -= chunk + 1; + off += chunk + 1; + // we don't have to look at the \n because we know it's no \r. + i++; + } + } + + // write the remainder + out.write(b, off, len); + } + + @Override + public void flush() throws IOException { + // we cannot flush our pending CR but we can ask our downstream to flush what we have sent them so far + out.flush(); + } + + @Override + public void close() throws IOException { + if (crPending) { + out.write('\r'); + } + crPending = false; + super.close(); + } + } }
--- a/tests/OnClientTester.java +++ b/tests/OnClientTester.java @@ -125,6 +125,10 @@ public final class OnClientTester { test_BugFixLevel(); if (isSelected("Upload")) test_Upload(); + if (isSelected("UploadCrLf")) + test_UploadCrLf(); + if (isSelected("NormalizeCrLf")) + test_NormalizeCrLf(); if (isSelected("ClientRefusesUpload")) test_ClientRefusesUpload(); if (isSelected("Offset0")) @@ -379,6 +383,108 @@ public final class OnClientTester { exitTest(); } + private void test_UploadCrLf() throws SQLException, Failure { + initTest("test_UploadCrLf"); + prepare(); + MonetConnection.UploadHandler handler = new MonetConnection.UploadHandler() { + @Override + public void handleUpload(MonetConnection.Upload handle, String name, boolean textMode, long linesToSkip) throws IOException { + String contentText = "100|foo\r\n10|bar\r\n1|baz\r\n"; + byte[] contentBytes = contentText.getBytes(StandardCharsets.UTF_8); + ByteArrayInputStream contentStream = new ByteArrayInputStream(contentBytes); + handle.uploadFrom(contentStream); + } + }; + conn.setUploadHandler(handler); + update("COPY INTO foo FROM 'banana' ON CLIENT"); + assertQueryInt("SELECT SUM(i * LENGTH(t)) FROM foo", 333); + exitTest(); + } + + private void test_NormalizeCrLf() throws Failure, IOException { + initTest("test_NormalizeCrLf"); + String[] fragments = { + /* does not end in pending cr */ "\r\naaa\n\n\r\n", + /* ends in pending cr */ "\n\r\naaa\r", + /* clears it */ "\n", + /* means call the single-argument write(), cr now pending */ "13", + /* again, should flush the pending one and remain pending */ "13", + /* now the pending cr should be dropped */ "10", + /* same as above, but with arrays */ "\r", "\r", "\n", + /* empty write should not clear the pending */ "\r", "", "\n", + /* trailing \r */ "\r", + }; + + ByteArrayOutputStream out0 = new ByteArrayOutputStream(); + MonetConnection.StripCrLfStream out = new MonetConnection.StripCrLfStream(out0); + ByteArrayOutputStream ref = new ByteArrayOutputStream(); + ArrayList<Integer> fragmentPositions = new ArrayList(); + ArrayList<Boolean> wasPending = new ArrayList(); + for (String f : fragments) { + int pos = out0.toByteArray().length; + boolean pending = out.pending(); + fragmentPositions.add(pos); + wasPending.add(pending); + if (!f.isEmpty() && Character.isDigit(f.charAt(0))) { + int n = Integer.parseInt(f); + ref.write(n); + out.write(n); + } else { + byte[] bytes = f.getBytes(StandardCharsets.UTF_8); + ref.write(bytes); + out.write(bytes); + } + } + out.close(); + + String data = new String(out0.toByteArray()); + String refData = new String(ref.toByteArray()).replaceAll("\r\n", "\n"); + + outBuffer.append("GOT\t\tEXPECTED\n"); + int fragNo = 0; + boolean different = false; + for (int i = 0; i < data.length() || i < refData.length(); i++) { + while (fragNo < fragmentPositions.size() && i == fragmentPositions.get(fragNo)) { + outBuffer.append("(Start of fragment "); + outBuffer.append(fragNo); + if (wasPending.get(fragNo)) { + outBuffer.append(", cr pending"); + } else { + outBuffer.append(", cr not pending"); + } + outBuffer.append(':'); + String frag = fragments[fragNo]; + if (!frag.isEmpty() && Character.isDigit(frag.charAt(0))) { + outBuffer.append(Integer.parseInt(frag)); + } else { + for (int k = 0; k < frag.length(); k++) { + int c = frag.charAt(k); + outBuffer.append(' '); + outBuffer.append(c); + if (c == '\n' && k != frag.length() - 1) + outBuffer.append(" "); + } + } + outBuffer.append(")\n"); + fragNo++; + } + int left = i < data.length() ? data.charAt(i) : 0; + int right = i < refData.length() ? refData.charAt(i) : 0; + outBuffer.append(left); + outBuffer.append("\t\t"); + outBuffer.append(right); + if (!different && left != right) { + outBuffer.append("\t\t <---------------------- first difference found!"); + different = true; + } + outBuffer.append('\n'); + } + + if (different) { + fail("Normalized text is different than expected"); + } + } + private void test_ClientRefusesUpload() throws SQLException, Failure { initTest("test_ClientRefusesUpload"); prepare();