changeset 613:9397c0b487f8

Normalize CRLF on upload
author Joeri van Ruth <joeri.van.ruth@monetdbsolutions.com>
date Tue, 18 Jan 2022 13:18:43 +0100 (2022-01-18)
parents 1d44b8a577ca
children 2eb21a7167f9
files src/main/java/org/monetdb/jdbc/MonetConnection.java tests/OnClientTester.java
diffstat 2 files changed, 189 insertions(+), 3 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/org/monetdb/jdbc/MonetConnection.java
+++ b/src/main/java/org/monetdb/jdbc/MonetConnection.java
@@ -3236,7 +3236,7 @@ public class MonetConnection
 		}
 
 		final long linesToSkip = offset >= 1 ? offset - 1 : 0;
-		final Upload handle = new Upload(server, uploadHandler::uploadCancelled);
+		final Upload handle = new Upload(server, uploadHandler::uploadCancelled, textMode);
 		final boolean wasFaking = server.setInsertFakePrompts(false);
 		try {
 			uploadHandler.handleUpload(handle, path, textMode, linesToSkip);
@@ -3326,13 +3326,15 @@ public class MonetConnection
 	public static class Upload {
 		private final MapiSocket server;
 		private final Runnable cancellationCallback;
+		private final boolean textMode;
 		private PrintStream print = null;
 		private String error = null;
 		private int customChunkSize = -1;
 
-		Upload(MapiSocket server, Runnable cancellationCallback) {
+		Upload(MapiSocket server, Runnable cancellationCallback, boolean textMode) {
 			this.server = server;
 			this.cancellationCallback = cancellationCallback;
+			this.textMode = textMode;
 		}
 
 		/**
@@ -3374,7 +3376,7 @@ public class MonetConnection
 				try {
 					final MapiSocket.UploadStream up = customChunkSize >= 0 ? server.uploadStream(customChunkSize) : server.uploadStream();
 					up.setCancellationCallback(cancellationCallback);
-					print = new PrintStream(up, false, "UTF-8");
+					print = new PrintStream(textMode ? new StripCrLfStream(up) : up, false, "UTF-8");
 					up.write('\n');
 				} catch (UnsupportedEncodingException e) {
 					throw new RuntimeException("The system is guaranteed to support the UTF-8 encoding but apparently it doesn't", e);
@@ -3567,4 +3569,82 @@ public class MonetConnection
 			}
 		}
 	}
+
+	public static class StripCrLfStream extends FilterOutputStream {
+		private boolean crPending = false;
+
+		public StripCrLfStream(OutputStream out) {
+			super(out);
+		}
+
+		public boolean pending() {
+			return this.crPending;
+		}
+
+		@Override
+		public void write(int b) throws IOException {
+			if (crPending && b != '\n') {
+					out.write('\r');
+			}
+			if (b != '\r') {
+				out.write(b);
+				crPending = false;
+			} else {
+				crPending = true;
+			}
+		}
+
+		@Override
+		public void write(byte[] b) throws IOException {
+			this.write(b, 0, b.length);
+		}
+
+		@Override
+		public void write(byte[] b, int off, int len) throws IOException {
+			if (len == 0) {
+				return;
+			}
+			if (crPending && b[0] != '\n') {
+				out.write('\r');
+			}
+
+			// deal with final \r up front
+			if (b[len - 1] == '\r') {
+				crPending = true;
+				len -= 1;
+			} else {
+				crPending = false;
+			}
+
+			for (int i = off; i < off + len - 1; i++) {
+				if (b[i] == '\r' && b[i + 1] == '\n') {
+					int chunk = i - off;
+					out.write(b, off, chunk);
+					// chunk + 1 because we want to skip the \r
+					len -= chunk + 1;
+					off += chunk + 1;
+					// we don't have to look at the \n because we know it's no \r.
+					i++;
+				}
+			}
+
+			// write the remainder
+			out.write(b, off, len);
+		}
+
+		@Override
+		public void flush() throws IOException {
+			// we cannot flush our pending CR but we can ask our downstream to flush what we have sent them so far
+			out.flush();
+		}
+
+		@Override
+		public void close() throws IOException {
+			if (crPending) {
+				out.write('\r');
+			}
+			crPending = false;
+			super.close();
+		}
+	}
 }
--- a/tests/OnClientTester.java
+++ b/tests/OnClientTester.java
@@ -125,6 +125,10 @@ public final class OnClientTester {
 				test_BugFixLevel();
 			if (isSelected("Upload"))
 				test_Upload();
+			if (isSelected("UploadCrLf"))
+				test_UploadCrLf();
+			if (isSelected("NormalizeCrLf"))
+				test_NormalizeCrLf();
 			if (isSelected("ClientRefusesUpload"))
 				test_ClientRefusesUpload();
 			if (isSelected("Offset0"))
@@ -379,6 +383,108 @@ public final class OnClientTester {
 		exitTest();
 	}
 
+	private void test_UploadCrLf() throws SQLException, Failure {
+		initTest("test_UploadCrLf");
+		prepare();
+		MonetConnection.UploadHandler handler = new MonetConnection.UploadHandler() {
+			@Override
+			public void handleUpload(MonetConnection.Upload handle, String name, boolean textMode, long linesToSkip) throws IOException {
+				String contentText = "100|foo\r\n10|bar\r\n1|baz\r\n";
+				byte[] contentBytes = contentText.getBytes(StandardCharsets.UTF_8);
+				ByteArrayInputStream contentStream = new ByteArrayInputStream(contentBytes);
+				handle.uploadFrom(contentStream);
+			}
+		};
+		conn.setUploadHandler(handler);
+		update("COPY INTO foo FROM 'banana' ON CLIENT");
+		assertQueryInt("SELECT SUM(i * LENGTH(t)) FROM foo", 333);
+		exitTest();
+	}
+
+	private void test_NormalizeCrLf() throws Failure, IOException {
+		initTest("test_NormalizeCrLf");
+		String[] fragments = {
+				/* does not end in pending cr */ "\r\naaa\n\n\r\n",
+				/* ends in pending cr */ "\n\r\naaa\r",
+				/* clears it */ "\n",
+				/* means call the single-argument write(), cr now pending */ "13",
+				/* again, should flush the pending one and remain pending */ "13",
+				/* now the pending cr should be dropped */ "10",
+				/* same as above, but with arrays */ "\r", "\r", "\n",
+				/* empty write should not clear the pending */ "\r", "", "\n",
+				/* trailing \r */ "\r",
+		};
+
+		ByteArrayOutputStream out0 = new ByteArrayOutputStream();
+		MonetConnection.StripCrLfStream out = new MonetConnection.StripCrLfStream(out0);
+		ByteArrayOutputStream ref = new ByteArrayOutputStream();
+		ArrayList<Integer> fragmentPositions = new ArrayList();
+		ArrayList<Boolean> wasPending = new ArrayList();
+		for (String f : fragments) {
+			int pos = out0.toByteArray().length;
+			boolean pending = out.pending();
+			fragmentPositions.add(pos);
+			wasPending.add(pending);
+			if (!f.isEmpty() && Character.isDigit(f.charAt(0))) {
+				int n = Integer.parseInt(f);
+				ref.write(n);
+				out.write(n);
+			} else {
+				byte[] bytes = f.getBytes(StandardCharsets.UTF_8);
+				ref.write(bytes);
+				out.write(bytes);
+			}
+		}
+		out.close();
+
+		String data = new String(out0.toByteArray());
+		String refData = new String(ref.toByteArray()).replaceAll("\r\n", "\n");
+
+		outBuffer.append("GOT\t\tEXPECTED\n");
+		int fragNo = 0;
+		boolean different = false;
+		for (int i = 0; i < data.length() || i < refData.length(); i++) {
+			while (fragNo < fragmentPositions.size() && i == fragmentPositions.get(fragNo)) {
+				outBuffer.append("(Start of fragment ");
+				outBuffer.append(fragNo);
+				if (wasPending.get(fragNo)) {
+					outBuffer.append(", cr pending");
+				} else {
+					outBuffer.append(", cr not pending");
+				}
+				outBuffer.append(':');
+				String frag = fragments[fragNo];
+				if (!frag.isEmpty() && Character.isDigit(frag.charAt(0))) {
+					outBuffer.append(Integer.parseInt(frag));
+				} else {
+					for (int k = 0; k < frag.length(); k++) {
+						int c = frag.charAt(k);
+						outBuffer.append(' ');
+						outBuffer.append(c);
+						if (c == '\n' && k != frag.length() - 1)
+							outBuffer.append("  ");
+					}
+				}
+				outBuffer.append(")\n");
+				fragNo++;
+			}
+			int left = i < data.length() ? data.charAt(i) : 0;
+			int right = i < refData.length() ? refData.charAt(i) : 0;
+			outBuffer.append(left);
+			outBuffer.append("\t\t");
+			outBuffer.append(right);
+			if (!different && left != right) {
+				outBuffer.append("\t\t <---------------------- first difference found!");
+				different = true;
+			}
+			outBuffer.append('\n');
+		}
+
+		if (different) {
+			fail("Normalized text is different than expected");
+		}
+	}
+
 	private void test_ClientRefusesUpload() throws SQLException, Failure {
 		initTest("test_ClientRefusesUpload");
 		prepare();