view tests/OnClientTester.java @ 580:13134a44dfc8 onclient

Test FileTransferHandler with multiple encodings
author Joeri van Ruth <joeri.van.ruth@monetdbsolutions.com>
date Thu, 14 Oct 2021 16:26:36 +0200 (2021-10-14)
parents 687034945b3f
children 5aef0ea654b1
line wrap: on
line source
/*
 * This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0.  If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
 *
 * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
 */

import org.monetdb.jdbc.MonetConnection;
import org.monetdb.jdbc.MonetConnection.UploadHandler;
import org.monetdb.jdbc.MonetConnection.DownloadHandler;
import org.monetdb.util.FileTransferHandler;

import java.io.*;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.List;

import static java.nio.file.StandardOpenOption.CREATE_NEW;

public final class OnClientTester extends TestRunner {

	public OnClientTester(String jdbcUrl, int verbosity, boolean watchDogEnabled) {
		super(jdbcUrl, verbosity, watchDogEnabled);
	}

	public static void main(String[] args) throws SQLException, NoSuchMethodException, ClassNotFoundException {
		String jdbcUrl = null;
		String requiredPrefix = null;
		int verbosity = 0;
		boolean watchDogEnabled = true;

		// Don't know why I need this all of a sudden.. is it only on my system?
		Class.forName("org.monetdb.jdbc.MonetDriver");

		for (String arg : args) {
			if (arg.equals("-v"))
				verbosity++;
			else if (arg.equals("-vv"))
				verbosity += 2;
			else if (arg.equals("-w"))
				watchDogEnabled = false;
			else if (jdbcUrl == null)
				jdbcUrl = arg;
			else if (requiredPrefix == null)
				requiredPrefix = arg;
			else {
				System.err.println("Unexpected argument " + arg);
				System.exit(2);
			}
		}

		OnClientTester tester = new OnClientTester(jdbcUrl, verbosity, watchDogEnabled);
		int failures = tester.runTests(requiredPrefix);

		if (failures > 0)
			System.exit(1);
	}

	/// Some tests have to work limitations of the protocol or bugs in the server.
	/// This Enum is used to indicate the possibilities.
	public enum BugFixLevel {
		/// Only those tests that work with older MonetDB versions
		Baseline(0, 0, 0),

		/// Connection keeps working after download request has been refused by client
		CanRefuseDownload(11, 41, 12),

		;

		private final int major;
		private final int minor;
		private final int micro;

		BugFixLevel(int major, int minor, int micro) {
			this.major = major;
			this.minor = minor;
			this.micro = micro;
		}

		boolean includesVersion(int major, int minor, int micro) {
			if (major > this.major)
				return true;
			if (major < this.major)
				return false;
			if (minor > this.minor)
				return true;
			if (minor < this.minor)
				return false;
			return micro >= this.micro;
		}

		static BugFixLevel forVersion(String version) {
			String[] parts = version.split("[.]", 3);
			assert parts.length == 3;
			int major = Integer.parseInt(parts[0]);
			int minor = Integer.parseInt(parts[1]);
			int micro = Integer.parseInt(parts[2]);

			return BugFixLevel.forVersion(major, minor, micro);
		}

		static BugFixLevel forVersion(int major, int minor, int micro) {
			BugFixLevel lastValid = Baseline;
			for (BugFixLevel level: BugFixLevel.values()) {
				if (level.includesVersion(major, minor, micro))
					lastValid = level;
				else
					break;
			}
			return lastValid;
		}
	}

	void prepare() throws SQLException {
		execute("DROP TABLE IF EXISTS foo");
		execute("CREATE TABLE foo (i INT, t TEXT)");
	}

	private BugFixLevel getLevel() throws SQLException, Failure {
		String version = queryString("SELECT value FROM environment WHERE name = 'monet_version'");
		BugFixLevel level = BugFixLevel.forVersion(version);
		out.println("  NOTE: version " + version + " means level = " + level);
		return level;
	}

	public void test_BugFixLevel() throws Failure {
		assertEq("Baseline includes 0.0.0", true, BugFixLevel.Baseline.includesVersion(0, 0, 0));
		assertEq("Baseline includes 11.41.11", true, BugFixLevel.Baseline.includesVersion(11, 41, 11));
		assertEq("Baseline includes 11.41.12", true, BugFixLevel.Baseline.includesVersion(11, 41, 12));

		assertEq("CanRefuseDownload includes 0.0.0", false, BugFixLevel.CanRefuseDownload.includesVersion(0, 0, 0));

		assertEq("CanRefuseDownload includes 11.0.0", false, BugFixLevel.CanRefuseDownload.includesVersion(11, 0, 0));
		assertEq("CanRefuseDownload includes 12.0.0", true, BugFixLevel.CanRefuseDownload.includesVersion(12, 0, 0));

		assertEq("CanRefuseDownload includes 11.41.0", false, BugFixLevel.CanRefuseDownload.includesVersion(11, 41, 0));
		assertEq("CanRefuseDownload includes 11.42.0", true, BugFixLevel.CanRefuseDownload.includesVersion(11, 42, 0));

		assertEq("CanRefuseDownload includes 11.41.11", false, BugFixLevel.CanRefuseDownload.includesVersion(11, 41, 11));
		assertEq("CanRefuseDownload includes 11.41.12", true, BugFixLevel.CanRefuseDownload.includesVersion(11, 41, 12));

		assertEq("Level for 0.0.0", BugFixLevel.Baseline, BugFixLevel.forVersion(0, 0, 0));
		assertEq("Level for 11.0.0", BugFixLevel.Baseline, BugFixLevel.forVersion(11, 0, 0));
		assertEq("Level for 11.41.0", BugFixLevel.Baseline, BugFixLevel.forVersion(11, 41, 0));
		assertEq("Level for 11.41.11", BugFixLevel.Baseline, BugFixLevel.forVersion(11, 41, 11));
		assertEq("Level for 11.41.12", BugFixLevel.CanRefuseDownload, BugFixLevel.forVersion(11, 41, 12));
		assertEq("Level for 11.42.0", BugFixLevel.CanRefuseDownload, BugFixLevel.forVersion(11, 42, 0));
		assertEq("Level for 12.0.0", BugFixLevel.CanRefuseDownload, BugFixLevel.forVersion(12, 0, 0));

		assertEq("Level for \"11.41.11\"", BugFixLevel.Baseline, BugFixLevel.forVersion("11.41.11"));
		assertEq("Level for \"11.41.12\"", BugFixLevel.CanRefuseDownload, BugFixLevel.forVersion("11.41.12"));
	}

	public void test_Upload() throws SQLException, Failure {
		prepare();
		MyUploadHandler handler = new MyUploadHandler(100);
		conn.setUploadHandler(handler);
		update("COPY INTO foo FROM 'banana' ON CLIENT");
		assertEq("cancellation callback called", false, handler.isCancelled());
		assertQueryInt("SELECT COUNT(*) FROM foo", 100);
	}

	public void test_ClientRefusesUpload() throws SQLException, Failure {
		prepare();
		MyUploadHandler handler = new MyUploadHandler("immediate error");
		conn.setUploadHandler(handler);
		expectError("COPY INTO foo FROM 'banana' ON CLIENT", "immediate error");
		assertEq("cancellation callback called", false, handler.isCancelled());
		assertQueryInt("SELECT COUNT(*) FROM foo", 0);
	}

	public void test_Offset0() throws SQLException, Failure {
		prepare();
		MyUploadHandler handler = new MyUploadHandler(100);
		conn.setUploadHandler(handler);
		update("COPY OFFSET 0 INTO foo FROM 'banana' ON CLIENT");
		assertEq("cancellation callback called", false, handler.isCancelled());
		assertQueryInt("SELECT MIN(i) FROM foo", 1);
		assertQueryInt("SELECT MAX(i) FROM foo", 100);
	}

	public void test_Offset1() throws SQLException, Failure {
		prepare();
		MyUploadHandler handler = new MyUploadHandler(100);
		conn.setUploadHandler(handler);
		update("COPY OFFSET 1 INTO foo FROM 'banana' ON CLIENT");
		assertEq("cancellation callback called", false, handler.isCancelled());
		assertQueryInt("SELECT MIN(i) FROM foo", 1);
		assertQueryInt("SELECT MAX(i) FROM foo", 100);
	}

	public void test_Offset5() throws SQLException, Failure {
		prepare();
		MyUploadHandler handler = new MyUploadHandler(100);
		conn.setUploadHandler(handler);
		update("COPY OFFSET 5 INTO foo FROM 'banana' ON CLIENT");
		assertEq("cancellation callback called", false, handler.isCancelled());
		assertQueryInt("SELECT MIN(i) FROM foo", 5);
		assertQueryInt("SELECT MAX(i) FROM foo", 100);
	}

	public void test_ServerStopsReading() throws SQLException, Failure {
		prepare();
		MyUploadHandler handler = new MyUploadHandler(100);
		conn.setUploadHandler(handler);
		update("COPY 10 RECORDS INTO foo FROM 'banana' ON CLIENT");
		assertEq("cancellation callback called", true, handler.isCancelled());
		assertEq("handler encountered write error", true, handler.encounteredWriteError());
		// connection is still alive
		assertQueryInt("SELECT COUNT(i) FROM foo", 10);
	}

	public void test_Download(int n) throws SQLException, Failure {
		prepare();
		MyDownloadHandler handler = new MyDownloadHandler();
		conn.setDownloadHandler(handler);
		String q = "INSERT INTO foo SELECT value as i, 'number' || value AS t FROM sys.generate_series(0, " + n + ")";
		update(q);
		update("COPY (SELECT * FROM foo) INTO 'banana' ON CLIENT");
		assertEq("download attempts", 1, handler.countAttempts());
		assertEq("lines downloaded", n, handler.lineCount());
		// connection is still alive
		assertQueryInt("SELECT COUNT(*) FROM foo", n);
	}

	public void test_Download() throws SQLException, Failure {
		test_Download(100);
	}

	public void test_ClientRefusesDownload() throws SQLException, Failure {
		prepare();
		BugFixLevel level = getLevel();
		MyDownloadHandler handler = new MyDownloadHandler("download refused");
		conn.setDownloadHandler(handler);
		update("INSERT INTO foo SELECT value as i, 'number' || value AS t FROM sys.generate_series(0, 100)");
		expectError("COPY (SELECT * FROM foo) INTO 'banana' ON CLIENT", "download refused");
		// Wish it were different but the server closes the connection
		expectError("SELECT 42 -- check if the connection still works", "Connection to server lost!");
		if (level.compareTo(BugFixLevel.CanRefuseDownload) >= 0) {
			// connection is still alive
			assertQueryInt("SELECT COUNT(*) FROM foo", 100);
		}
	}

	public void test_LargeUpload() throws SQLException, Failure {
		watchDog.setDuration(25_000);
		prepare();
		int n = 4_000_000;
		MyUploadHandler handler = new MyUploadHandler(n);
		conn.setUploadHandler(handler);
		handler.setChunkSize(1024 * 1024);
		update("COPY INTO foo FROM 'banana' ON CLIENT");
		assertEq("cancellation callback called", false, handler.isCancelled());
		// connection is still alive
		assertQueryInt("SELECT COUNT(DISTINCT i) FROM foo", n);
	}

	public void test_LargeDownload() throws SQLException, Failure {
		watchDog.setDuration(25_000);
		test_Download(4_000_000);
	}

	public void test_UploadFromStream() throws SQLException, Failure {
		prepare();
		UploadHandler handler = new UploadHandler() {
			final String data = "1|one\n2|two\n3|three\n";

			@Override
			public void handleUpload(MonetConnection.Upload handle, String name, boolean textMode, long linesToSkip) throws IOException {
				// ignoring linesToSkip as it's not used in this test
				ByteArrayInputStream s = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
				handle.uploadFrom(s);
			}
		};
		conn.setUploadHandler(handler);
		update("COPY INTO foo FROM 'banana' ON CLIENT");
		// connection is still alive
		assertQueryInt("SELECT i FROM foo WHERE t = 'three'", 3);
	}

	public void test_UploadFromReader() throws SQLException, Failure {
		prepare();
		UploadHandler handler = new UploadHandler() {
			final String data = "1|one\n2|two\n3|three\n";

			@Override
			public void handleUpload(MonetConnection.Upload handle, String name, boolean textMode, long linesToSkip) throws IOException {
				// ignoring linesToSkip as it's not used in this test
				StringReader r = new StringReader(data);
				handle.uploadFrom(r);
			}
		};
		conn.setUploadHandler(handler);
		update("COPY INTO foo FROM 'banana' ON CLIENT");
		assertQueryInt("SELECT i FROM foo WHERE t = 'three'", 3);
	}

	public void test_UploadFromReaderOffset() throws SQLException, Failure {
		prepare();
		UploadHandler handler = new UploadHandler() {
			final String data = "1|one\n2|two\n3|three\n";

			@Override
			public void handleUpload(MonetConnection.Upload handle, String name, boolean textMode, long linesToSkip) throws IOException {
				BufferedReader r = new BufferedReader(new StringReader(data));
				handle.uploadFrom(r, linesToSkip);
			}
		};
		conn.setUploadHandler(handler);
		update("COPY OFFSET 2 INTO foo FROM 'banana' ON CLIENT");
		assertQueryInt("SELECT i FROM foo WHERE t = 'three'", 3);
	}

	public void test_FailUploadLate() throws SQLException, Failure {
		prepare();
		MyUploadHandler handler = new MyUploadHandler(100, 50, "i don't like line 50");
		conn.setUploadHandler(handler);
		expectError("COPY INTO foo FROM 'banana' ON CLIENT", "i don't like");
		assertEq("cancellation callback called", false, handler.isCancelled());
		assertEq("connection is closed", true, conn.isClosed());
	}


	public void test_FailUploadLate2() throws SQLException, Failure {
		// Here we send empty lines only, to check if the server detects is properly instead
		// of simply complaining about an incomplete file.
		prepare();
		UploadHandler handler = new UploadHandler() {
			@Override
			public void handleUpload(MonetConnection.Upload handle, String name, boolean textMode, long linesToSkip) throws IOException {
				// ignoring linesToSkip as it's not used in this test
				PrintStream stream = handle.getStream();
				for (int i = 1; i <= 20_000; i++)
					stream.println();
				stream.flush();
				throw new IOException("exception after all");
			}
		};
		conn.setUploadHandler(handler);
		expectError("COPY INTO foo(t) FROM 'banana'(t) ON CLIENT", "after all");
		assertEq("connection is closed", true, conn.isClosed());
		// Cannot check the server log, but at the time I checked, it said "prematurely stopped client", which is fine.
	}

	public void test_FailDownloadLate() throws SQLException, Failure {
		prepare();
		MyDownloadHandler handler = new MyDownloadHandler(200, "download refused");
		conn.setDownloadHandler(handler);
		update("INSERT INTO foo SELECT value as i, 'number' || value AS t FROM sys.generate_series(0, 100)");
		expectError("COPY (SELECT * FROM sys.generate_series(0,200)) INTO 'banana' ON CLIENT", "download refused");
		// Exception closes the connection
		assertEq("connection is closed", conn.isClosed(), true);
	}

	public void test_FileTransferHandlerUploadUtf8() throws IOException, SQLException, Failure {
		testFileTransferHandlerUpload("UTF-8");
	}

	public void test_FileTransferHandlerUploadLatin1() throws IOException, SQLException, Failure {
		testFileTransferHandlerUpload("latin1");
	}

	public void testFileTransferHandlerUpload(String encoding) throws IOException, SQLException, Failure {
		prepare();
		Path d = getTmpDir(currentTestName);
		Path f = d.resolve("data.txt");
		OutputStream s = Files.newOutputStream(f, CREATE_NEW);
		PrintStream ps = new PrintStream(s, false, encoding);
		ps.println("1|one");
		ps.println("2|tw??");
		ps.println("3|three");
		ps.close();
		conn.setUploadHandler(new FileTransferHandler(d, Charset.forName(encoding)));
		update("COPY INTO foo FROM 'data.txt' ON CLIENT");
		assertQueryInt("SELECT SUM(i) FROM foo", 6);
		assertQueryString("SELECT t FROM foo WHERE i = 2", "tw??");
	}

	public void test_FileTransferHandlerUploadRefused() throws IOException, SQLException, Failure {
		prepare();
		Path d = getTmpDir(currentTestName);
		Path f = d.resolve("data.txt");
		OutputStream s = Files.newOutputStream(f, CREATE_NEW);
		PrintStream ps = new PrintStream(s, false, "UTF-8");
		ps.println("1|one");
		ps.println("2|two");
		ps.println("3|three");
		ps.close();

		Path d2 = getTmpDir(currentTestName + "2");
		conn.setUploadHandler(new FileTransferHandler(d2, StandardCharsets.UTF_8));
		String quoted = f.toAbsolutePath().toString().replaceAll("'", "''");
		expectError("COPY INTO foo FROM R'"+ quoted + "' ON CLIENT", "not in upload directory");
		// connection is still alive
		assertQueryInt("SELECT SUM(i) FROM foo", 0);
	}

	public void test_FileTransferHandlerDownloadUtf8() throws SQLException, Failure, IOException {
		testFileTransferHandlerDownload(StandardCharsets.UTF_8);
	}

	public void test_FileTransferHandlerDownloadLatin1() throws SQLException, Failure, IOException {
		Charset latin1 = Charset.forName("latin1");
		testFileTransferHandlerDownload(latin1);
	}

	public void testFileTransferHandlerDownload(Charset encoding) throws SQLException, Failure, IOException {
		prepare();
		update("INSERT INTO foo VALUES (42, 'forty-tw??')");
		Path d = getTmpDir(currentTestName);
		conn.setDownloadHandler(new FileTransferHandler(d, encoding));
		update("COPY SELECT * FROM foo INTO 'data.txt' ON CLIENT");
		List<String> lines = Files.readAllLines(d.resolve("data.txt"), encoding);
		assertEq("lines written", lines.size(), 1);
		assertEq("line content", lines.get(0), "42|\"forty-tw??\"");
		// connection is still alive
		assertQueryInt("SELECT SUM(i) FROM foo", 42);
	}

	public void test_FileTransferHandlerDownloadRefused() throws SQLException, Failure, IOException {
		prepare();
		BugFixLevel level = getLevel();
		update("INSERT INTO foo VALUES (42, 'forty-two')");
		Path d = getTmpDir(currentTestName);
		Path d2 = getTmpDir(currentTestName + "2");
		conn.setDownloadHandler(new FileTransferHandler(d2, StandardCharsets.UTF_8));
		String quoted = d.resolve("data.txt").toAbsolutePath().toString().replaceAll("'", "''");
		expectError("COPY SELECT * FROM foo INTO R'" + quoted + "' ON CLIENT", "not in download directory");
		if (level.compareTo(BugFixLevel.CanRefuseDownload) >= 0) {
			// connection is still alive
			assertQueryInt("SELECT SUM(i) FROM foo", 42);
		}
	}

	static class MyUploadHandler implements UploadHandler {
		private final long rows;
		private final long errorAt;
		private final String errorMessage;
		private boolean encounteredWriteError = false;
		private boolean cancelled = false;

		private int chunkSize = 100; // small number to trigger more bugs

		MyUploadHandler(long rows, long errorAt, String errorMessage) {
			this.rows = rows;
			this.errorAt = errorAt;
			this.errorMessage = errorMessage;
		}

		MyUploadHandler(long rows) {
			this(rows, -1, null);
		}

		MyUploadHandler(String errorMessage) {
			this(0, -1, errorMessage);
		}

		public void setChunkSize(int chunkSize) {
			this.chunkSize = chunkSize;
		}

		@Override
		public void uploadCancelled() {
			cancelled = true;
		}

		@Override
		public void handleUpload(MonetConnection.Upload handle, String name, boolean textMode, long linesToSkip) throws IOException {
			if (errorAt == -1 && errorMessage != null) {
				handle.sendError(errorMessage);
				return;
			}
			handle.setChunkSize(chunkSize);
			PrintStream stream = handle.getStream();
			for (long i = linesToSkip; i < rows; i++) {
				if (i == errorAt) {
					throw new IOException(errorMessage);
				}
				stream.printf("%d|%d%n", i + 1, i + 1);
				if (i % 25 == 0 && stream.checkError()) {
					encounteredWriteError = true;
					break;
				}
			}
		}

		public Object encounteredWriteError() {
			return encounteredWriteError;
		}

		public boolean isCancelled() {
			return cancelled;
		}
	}

	static class MyDownloadHandler implements DownloadHandler {
		private final int errorAtByte;
		private final String errorMessage;
		private int attempts = 0;
		private int bytesSeen = 0;
		private int lineEndingsSeen = 0;
		private int startOfLine = 0;

		MyDownloadHandler(int errorAtByte, String errorMessage) {
			this.errorAtByte = errorAtByte;
			this.errorMessage = errorMessage;
		}

		MyDownloadHandler(String errorMessage) {
			this(-1, errorMessage);
		}

		MyDownloadHandler() {
			this(-1, null);
		}

		@Override
		public void handleDownload(MonetConnection.Download handle, String name, boolean textMode) throws IOException {
			attempts++;
			bytesSeen = 0;
			lineEndingsSeen = 0;
			startOfLine = 0;

			if (errorMessage != null && errorAtByte < 0) {
				handle.sendError(errorMessage);
				return;
			}

			InputStream stream = handle.getStream();
			byte[] buffer = new byte[1024];
			while (true) {
				int toRead = buffer.length;
				if (errorMessage != null && errorAtByte >= 0) {
					if (bytesSeen == errorAtByte) {
						throw new IOException(errorMessage);
					}
					toRead = Integer.min(toRead, errorAtByte - bytesSeen);
				}
				int nread = stream.read(buffer, 0, toRead);
				if (nread < 0)
					break;
				for (int i = 0; i < nread; i++) {
					if (buffer[i] == '\n') {
						lineEndingsSeen += 1;
						startOfLine = bytesSeen + i + 1;
					}
				}
				bytesSeen += nread;
			}
		}

		public int countAttempts() {
			return attempts;
		}

		public int lineCount() {
			int lines = lineEndingsSeen;
			if (startOfLine != bytesSeen)
				lines++;
			return lines;
		}
	}

}