Mercurial > hg > monetdb-java
view tests/OnClientTester.java @ 970:f90d811e97eb default tip
Adjust getTableTypes() test for new table type: LOCAL TEMPORARY VIEW, added in 11.53.4 (Mar2025-SP1)
author | Martin van Dinther <martin.van.dinther@monetdbsolutions.com> |
---|---|
date | Thu, 03 Apr 2025 15:01:33 +0200 (32 hours ago) |
parents | ff075ed5ce81 |
children |
line wrap: on
line source
/* * SPDX-License-Identifier: MPL-2.0 * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. * * Copyright 2024, 2025 MonetDB Foundation; * Copyright August 2008 - 2023 MonetDB B.V.; * Copyright 1997 - July 2008 CWI. */ import org.monetdb.jdbc.MonetConnection; import org.monetdb.jdbc.MonetConnection.UploadHandler; import org.monetdb.jdbc.MonetConnection.DownloadHandler; import org.monetdb.util.FileTransferHandler; import java.io.*; import java.lang.Character.UnicodeBlock; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.zip.GZIPOutputStream; import static java.nio.file.StandardOpenOption.CREATE_NEW; /** * Program to test MonetDB JDBC Driver in combination with SQL: * COPY ... INTO ... ON CLIENT * commands. * This allows Java programmers to locally (so ON CLIENT) stream csv data * to and from the MonetDB server for fast bulk data import / export. * * Specifically it tests the MonetDB specific extensions to register upload and download handlers * see {@link org.monetdb.jdbc.MonetConnection#setUploadHandler(UploadHandler)} * see {@link org.monetdb.jdbc.MonetConnection#setDownloadHandler(DownloadHandler)} * and streaming of csv data to and from the MonetDB server using MAPI protocol. * * It also tests reading / writing data from / to a local file using * {@link org.monetdb.util.FileTransferHandler} * * @author JvR * @version 0.1 */ public final class OnClientTester { public static final int VERBOSITY_NONE = 0; public static final int VERBOSITY_ON = 1; public static final int VERBOSITY_SHOW_ALL = 2; private final String jdbcUrl; private final int verbosity; private final ArrayList<String> selectedTests; private String currentTestName; private long startTime; private MonetConnection conn; private Statement stmt; private StringBuilder outBuffer; private Path tmpDir; public OnClientTester(String jdbcUrl, int verbosity) { this.jdbcUrl = jdbcUrl; this.verbosity = verbosity; this.selectedTests = null; } public OnClientTester(String jdbcUrl, int verbosity, ArrayList<String> selectedTests) { this.jdbcUrl = jdbcUrl; this.verbosity = verbosity; this.selectedTests = selectedTests; } public static void main(String[] args) { String jdbcUrl = null; int verbosity = 0; ArrayList<String> selectedTests = new ArrayList<String>(); for (String arg : args) { if (arg.equals("-v")) verbosity++; else if (arg.equals("-vv")) verbosity += 2; else if (jdbcUrl == null) jdbcUrl = arg; else if (arg.startsWith("-")){ System.err.println("Unexpected argument " + arg); System.exit(2); } else { selectedTests.add(arg); } } if (jdbcUrl == null || jdbcUrl.isEmpty()) { System.err.println("Missing required startup argument: JDBC_connection_URL"); System.exit(1); } OnClientTester tester = new OnClientTester(jdbcUrl, verbosity, selectedTests); int failures = tester.runTests(); if (failures > 0) System.exit(-1); } boolean isSelected(String name) { return selectedTests == null || selectedTests.isEmpty() || selectedTests.contains(name); } public int runTests() { if (! openConnection()) return 1; // failed to open JDBC connection to MonetDB outBuffer = new StringBuilder(1024); int failures = 0; try { // all test methods start with test_ and have no arguments if (isSelected("BugFixLevel")) test_BugFixLevel(); if (isSelected("Upload")) test_Upload(); if (isSelected("UploadCrLf")) test_UploadCrLf(); if (isSelected("NormalizeCrLf")) test_NormalizeCrLf(); if (isSelected("ClientRefusesUpload")) test_ClientRefusesUpload(); if (isSelected("Offset0")) test_Offset0(); if (isSelected("Offset1")) test_Offset1(); if (isSelected("Offset5")) test_Offset5(); if (isSelected("ServerStopsReading")) test_ServerStopsReading(); if (isSelected("Download")) test_Download(); if (isSelected("ClientRefusesDownload")) test_ClientRefusesDownload(); if (isSelected("LargeUpload")) test_LargeUpload(); if (isSelected("LargeDownload")) test_LargeDownload(); if (isSelected("DownloadCrLf")) test_DownloadCrLf(); if (isSelected("UploadFromStream")) test_UploadFromStream(); if (isSelected("UploadFromReader")) test_UploadFromReader(); if (isSelected("UploadFromReaderOffset")) test_UploadFromReaderOffset(); if (isSelected("FailUploadLate")) test_FailUploadLate(); if (isSelected("FailUploadLate2")) test_FailUploadLate2(); if (isSelected("FailDownloadLate")) test_FailDownloadLate(); if (isSelected("FileTransferHandlerUploadUtf8")) test_FileTransferHandlerUploadUtf8(); if (isSelected("FileTransferHandlerUploadLatin1")) test_FileTransferHandlerUploadLatin1(); if (isSelected("FileTransferHandlerUploadNull")) test_FileTransferHandlerUploadNull(); if (isSelected("FileTransferHandlerUploadRefused")) test_FileTransferHandlerUploadRefused(); if (isSelected("FileTransferHandlerDownloadUtf8")) test_FileTransferHandlerDownloadUtf8(); if (isSelected("FileTransferHandlerDownloadLatin1")) test_FileTransferHandlerDownloadLatin1(); if (isSelected("FileTransferHandlerDownloadNull")) test_FileTransferHandlerDownloadNull(); if (isSelected("test_FileTransferHandlerUploadNotCompressed")) test_FileTransferHandlerUploadNotCompressed(); if (isSelected("test_FileTransferHandlerUploadNotCompressedSkip")) test_FileTransferHandlerUploadNotCompressedSkip(); if (isSelected("test_FileTransferHandlerUploadCompressed")) test_FileTransferHandlerUploadCompressed(); if (isSelected("test_FileTransferHandlerUploadCompressedSkip")) test_FileTransferHandlerUploadCompressedSkip(); if (isSelected("FileTransferHandlerDownloadRefused")) test_FileTransferHandlerDownloadRefused(); } catch (Failure e) { failures++; System.err.println(); System.err.println("Test " + currentTestName + " failed"); dumpOutput(); } catch (Exception e) { failures++; System.err.println(); System.err.println("Test " + currentTestName + " failed:"); e.printStackTrace(System.err); dumpOutput(); // Show the inner bits of the exception again, they may have scrolled off screen Throwable t = e; while (t.getCause() != null) { t = t.getCause(); } System.err.println("Innermost cause was " + t); if (t.getStackTrace().length > 0) { System.err.println(" at " + t.getStackTrace()[0]); } } finally { try { // cleanup created test table execute("DROP TABLE IF EXISTS foo"); } catch (SQLException e) { /* ignore */ } } closeConnection(); return failures; } private boolean openConnection() { try { // make a connection to MonetDB, its reused for all tests final Connection genericConnection = DriverManager.getConnection(jdbcUrl); conn = genericConnection.unwrap(MonetConnection.class); stmt = conn.createStatement(); return true; } catch (SQLException e) { System.err.println("Failed to connect using JDBC URL: " + jdbcUrl); System.err.println(e); Throwable t = e; while (t.getCause() != null) { t = t.getCause(); System.err.println("Caused by: " + t); } } return false; } private void closeConnection() { if (stmt != null) { try { stmt.close(); } catch (SQLException e) { /* ignore */ } stmt = null; } if (conn != null) { try { conn.close(); } catch (Exception e) { /* ignore */ } conn = null; } } private void initTest(final String name) { currentTestName = name; outBuffer.setLength(0); // clear the output log buffer startTime = System.currentTimeMillis(); } private void exitTest() { if (verbosity > VERBOSITY_ON) System.err.println(); if (verbosity >= VERBOSITY_ON) { final long duration = System.currentTimeMillis() - startTime; System.err.println("Test " + currentTestName + " succeeded in " + duration + "ms"); } if (verbosity >= VERBOSITY_SHOW_ALL) dumpOutput(); if (conn.isClosed()) openConnection(); // restore connection for next test } private void dumpOutput() { final String output = outBuffer.toString(); if (output.isEmpty()) { System.err.println("(Test " + currentTestName + " did not produce any output)"); } else { System.err.println("------ Accumulated output for test " + currentTestName + ":"); System.err.println(output); System.err.println("------ End of accumulated output"); } } /// Some tests have to work limitations of the protocol or bugs in the server. /// This Enum is used to indicate the possibilities. private enum BugFixLevel { /// Only those tests that work with older MonetDB versions Baseline(0, 0, 0), /// Connection keeps working after download request has been refused by client CanRefuseDownload(11, 41, 12), ; private final int major; private final int minor; private final int micro; BugFixLevel(int major, int minor, int micro) { this.major = major; this.minor = minor; this.micro = micro; } boolean includesVersion(int major, int minor, int micro) { if (major > this.major) return true; if (major < this.major) return false; if (minor > this.minor) return true; if (minor < this.minor) return false; return micro >= this.micro; } static BugFixLevel forVersion(String version) { String[] parts = version.split("[.]", 3); assert parts.length == 3; int major = Integer.parseInt(parts[0]); int minor = Integer.parseInt(parts[1]); int micro = Integer.parseInt(parts[2]); return BugFixLevel.forVersion(major, minor, micro); } static BugFixLevel forVersion(int major, int minor, int micro) { BugFixLevel lastValid = Baseline; for (BugFixLevel level : BugFixLevel.values()) { if (level.includesVersion(major, minor, micro)) lastValid = level; else break; } return lastValid; } } private void prepare() throws SQLException { execute("DROP TABLE IF EXISTS foo"); execute("CREATE TABLE foo (i INT, t CLOB)"); } private BugFixLevel getLevel() throws SQLException, Failure { String version = queryString("SELECT value FROM environment WHERE name = 'monet_version'"); BugFixLevel level = BugFixLevel.forVersion(version); outBuffer.append(" NOTE: version ").append(version).append(" means level = ").append(level).append("\n"); return level; } private void test_BugFixLevel() throws Failure { initTest("test_BugFixLevel"); assertEq("Baseline includes 0.0.0", true, BugFixLevel.Baseline.includesVersion(0, 0, 0)); assertEq("Baseline includes 11.41.11", true, BugFixLevel.Baseline.includesVersion(11, 41, 11)); assertEq("Baseline includes 11.41.12", true, BugFixLevel.Baseline.includesVersion(11, 41, 12)); assertEq("CanRefuseDownload includes 0.0.0", false, BugFixLevel.CanRefuseDownload.includesVersion(0, 0, 0)); assertEq("CanRefuseDownload includes 11.0.0", false, BugFixLevel.CanRefuseDownload.includesVersion(11, 0, 0)); assertEq("CanRefuseDownload includes 12.0.0", true, BugFixLevel.CanRefuseDownload.includesVersion(12, 0, 0)); assertEq("CanRefuseDownload includes 11.41.0", false, BugFixLevel.CanRefuseDownload.includesVersion(11, 41, 0)); assertEq("CanRefuseDownload includes 11.42.0", true, BugFixLevel.CanRefuseDownload.includesVersion(11, 42, 0)); assertEq("CanRefuseDownload includes 11.41.11", false, BugFixLevel.CanRefuseDownload.includesVersion(11, 41, 11)); assertEq("CanRefuseDownload includes 11.41.12", true, BugFixLevel.CanRefuseDownload.includesVersion(11, 41, 12)); assertEq("Level for 0.0.0", BugFixLevel.Baseline, BugFixLevel.forVersion(0, 0, 0)); assertEq("Level for 11.0.0", BugFixLevel.Baseline, BugFixLevel.forVersion(11, 0, 0)); assertEq("Level for 11.41.0", BugFixLevel.Baseline, BugFixLevel.forVersion(11, 41, 0)); assertEq("Level for 11.41.11", BugFixLevel.Baseline, BugFixLevel.forVersion(11, 41, 11)); assertEq("Level for 11.41.12", BugFixLevel.CanRefuseDownload, BugFixLevel.forVersion(11, 41, 12)); assertEq("Level for 11.42.0", BugFixLevel.CanRefuseDownload, BugFixLevel.forVersion(11, 42, 0)); assertEq("Level for 12.0.0", BugFixLevel.CanRefuseDownload, BugFixLevel.forVersion(12, 0, 0)); assertEq("Level for \"11.41.11\"", BugFixLevel.Baseline, BugFixLevel.forVersion("11.41.11")); assertEq("Level for \"11.41.12\"", BugFixLevel.CanRefuseDownload, BugFixLevel.forVersion("11.41.12")); exitTest(); } private void test_Upload() throws SQLException, Failure { initTest("test_Upload"); prepare(); MyUploadHandler handler = new MyUploadHandler(100); conn.setUploadHandler(handler); update("COPY INTO foo FROM 'banana' ON CLIENT"); assertEq("cancellation callback called", false, handler.isCancelled()); assertQueryInt("SELECT COUNT(*) FROM foo", 100); exitTest(); } private void test_UploadCrLf() throws SQLException, Failure { initTest("test_UploadCrLf"); prepare(); MonetConnection.UploadHandler handler = new MonetConnection.UploadHandler() { @Override public void handleUpload(MonetConnection.Upload handle, String name, boolean textMode, long linesToSkip) throws IOException { String contentText = "100|foo\r\n10|bar\r\n1|baz\r\n"; byte[] contentBytes = contentText.getBytes(StandardCharsets.UTF_8); ByteArrayInputStream contentStream = new ByteArrayInputStream(contentBytes); handle.uploadFrom(contentStream); } }; conn.setUploadHandler(handler); update("COPY INTO foo FROM 'banana' ON CLIENT"); assertQueryInt("SELECT SUM(i * LENGTH(t)) FROM foo", 333); exitTest(); } private void test_NormalizeCrLf() throws Failure, IOException { initTest("test_NormalizeCrLf"); String[] fragments = { /* does not end in pending cr */ "\r\naaa\n\n\r\n", /* ends in pending cr */ "\n\r\naaa\r", /* clears it */ "\n", /* means call the single-argument write(), cr now pending */ "13", /* again, should flush the pending one and remain pending */ "13", /* now the pending cr should be dropped */ "10", /* same as above, but with arrays */ "\r", "\r", "\n", /* empty write should not clear the pending */ "\r", "", "\n", /* trailing \r */ "\r", }; ByteArrayOutputStream out0 = new ByteArrayOutputStream(); MonetConnection.StripCrLfStream out = new MonetConnection.StripCrLfStream(out0); ByteArrayOutputStream ref = new ByteArrayOutputStream(); ArrayList<Integer> fragmentPositions = new ArrayList<Integer>(); ArrayList<Boolean> wasPending = new ArrayList<Boolean>(); for (String f : fragments) { int pos = out0.toByteArray().length; boolean pending = out.pending(); fragmentPositions.add(pos); wasPending.add(pending); if (!f.isEmpty() && Character.isDigit(f.charAt(0))) { int n = Integer.parseInt(f); ref.write(n); out.write(n); } else { byte[] bytes = f.getBytes(StandardCharsets.UTF_8); ref.write(bytes); out.write(bytes); } } out.close(); String data = new String(out0.toByteArray()); String refData = new String(ref.toByteArray()).replaceAll("\r\n", "\n"); outBuffer.append("GOT\t\tEXPECTED\n"); int fragNo = 0; boolean different = false; for (int i = 0; i < data.length() || i < refData.length(); i++) { while (fragNo < fragmentPositions.size() && i == fragmentPositions.get(fragNo)) { outBuffer.append("(Start of fragment "); outBuffer.append(fragNo); if (wasPending.get(fragNo)) { outBuffer.append(", cr pending"); } else { outBuffer.append(", cr not pending"); } outBuffer.append(':'); String frag = fragments[fragNo]; if (!frag.isEmpty() && Character.isDigit(frag.charAt(0))) { outBuffer.append(Integer.parseInt(frag)); } else { for (int k = 0; k < frag.length(); k++) { int c = frag.charAt(k); outBuffer.append(' '); outBuffer.append(c); if (c == '\n' && k != frag.length() - 1) outBuffer.append(" "); } } outBuffer.append(")\n"); fragNo++; } int left = i < data.length() ? data.charAt(i) : 0; int right = i < refData.length() ? refData.charAt(i) : 0; outBuffer.append(left); outBuffer.append("\t\t"); outBuffer.append(right); if (!different && left != right) { outBuffer.append("\t\t <---------------------- first difference found!"); different = true; } outBuffer.append('\n'); } if (different) { fail("Normalized text is different than expected"); } exitTest(); } private void test_ClientRefusesUpload() throws SQLException, Failure { initTest("test_ClientRefusesUpload"); prepare(); MyUploadHandler handler = new MyUploadHandler("immediate error"); conn.setUploadHandler(handler); expectError("COPY INTO foo FROM 'banana' ON CLIENT", "immediate error"); assertEq("cancellation callback called", false, handler.isCancelled()); assertQueryInt("SELECT COUNT(*) FROM foo", 0); exitTest(); } private void test_Offset0() throws SQLException, Failure { initTest("test_Offset0"); prepare(); MyUploadHandler handler = new MyUploadHandler(100); conn.setUploadHandler(handler); update("COPY OFFSET 0 INTO foo FROM 'banana' ON CLIENT"); assertEq("cancellation callback called", false, handler.isCancelled()); assertQueryInt("SELECT MIN(i) FROM foo", 1); assertQueryInt("SELECT MAX(i) FROM foo", 100); exitTest(); } private void test_Offset1() throws SQLException, Failure { initTest("test_Offset1"); prepare(); MyUploadHandler handler = new MyUploadHandler(100); conn.setUploadHandler(handler); update("COPY OFFSET 1 INTO foo FROM 'banana' ON CLIENT"); assertEq("cancellation callback called", false, handler.isCancelled()); assertQueryInt("SELECT MIN(i) FROM foo", 1); assertQueryInt("SELECT MAX(i) FROM foo", 100); exitTest(); } private void test_Offset5() throws SQLException, Failure { initTest("test_Offset5"); prepare(); MyUploadHandler handler = new MyUploadHandler(100); conn.setUploadHandler(handler); update("COPY OFFSET 5 INTO foo FROM 'banana' ON CLIENT"); assertEq("cancellation callback called", false, handler.isCancelled()); assertQueryInt("SELECT MIN(i) FROM foo", 5); assertQueryInt("SELECT MAX(i) FROM foo", 100); exitTest(); } private void test_ServerStopsReading() throws SQLException, Failure { initTest("test_ServerStopsReading"); prepare(); long n = 2 * 1024 * 1024 / 10; MyUploadHandler handler = new MyUploadHandler(n); conn.setUploadHandler(handler); update("COPY 10 RECORDS INTO foo FROM 'banana' ON CLIENT"); assertEq("cancellation callback called", true, handler.isCancelled()); assertEq("handler encountered write error", true, handler.encounteredWriteError()); // connection is still alive assertQueryInt("SELECT COUNT(i) FROM foo", 10); exitTest(); } private void test_Download(int n) throws SQLException, Failure { prepare(); MyDownloadHandler handler = new MyDownloadHandler(); conn.setDownloadHandler(handler); String q = "INSERT INTO foo SELECT value as i, 'number' || value AS t FROM sys.generate_series(0, " + n + ")"; update(q); update("COPY (SELECT * FROM foo) INTO 'banana' ON CLIENT"); assertEq("download attempts", 1, handler.countAttempts()); assertEq("lines downloaded", n, handler.lineCount()); // connection is still alive assertQueryInt("SELECT COUNT(*) FROM foo", n); } private void test_Download() throws SQLException, Failure { initTest("test_Download"); test_Download(100); exitTest(); } private void test_ClientRefusesDownload() throws SQLException, Failure { initTest("test_ClientRefusesDownload"); prepare(); BugFixLevel level = getLevel(); MyDownloadHandler handler = new MyDownloadHandler("download refused"); conn.setDownloadHandler(handler); update("INSERT INTO foo SELECT value as i, 'number' || value AS t FROM sys.generate_series(0, 100)"); expectError("COPY (SELECT * FROM foo) INTO 'banana' ON CLIENT", "download refused"); // Wish it were different but the server closes the connection expectError("SELECT 42 -- check if the connection still works", "Connection to server lost!"); if (level.compareTo(BugFixLevel.CanRefuseDownload) >= 0) { // connection is still alive assertQueryInt("SELECT COUNT(*) FROM foo", 100); } exitTest(); } private void test_LargeUpload() throws SQLException, Failure { initTest("test_LargeUpload"); prepare(); int n = 4_000_000; MyUploadHandler handler = new MyUploadHandler(n); conn.setUploadHandler(handler); handler.setChunkSize(1024 * 1024); update("COPY INTO foo FROM 'banana' ON CLIENT"); assertEq("cancellation callback called", false, handler.isCancelled()); // connection is still alive assertQueryInt("SELECT COUNT(DISTINCT i) FROM foo", n); exitTest(); } private void test_LargeDownload() throws SQLException, Failure { initTest("test_LargeDownload"); test_Download(4_000_000); exitTest(); } private void test_DownloadCrLf() throws SQLException, Failure, IOException { // This tests forces line ending conversion and reads in small batches, hoping to trigger corner cases initTest("test_DownloadCrLf"); prepare(); update("ALTER TABLE foo DROP COLUMN t"); update("ALTER TABLE foo ADD COLUMN j INT"); update("INSERT INTO foo SELECT rand() % CASE WHEN value % 10 = 0 THEN 1000 ELSE 10 END AS i, 0 AS j FROM generate_series(0, 500000)"); ByteArrayOutputStream target = new ByteArrayOutputStream(); Random rng = new Random(42); DownloadHandler handler = (handle, name, textMode) -> { handle.setLineSeparator("\r\n"); InputStream s = handle.getStream(); byte[] buf = new byte[10]; boolean expectEof = false; for (;;) { int n = rng.nextInt(buf.length - 1) + 1; int nread = s.read(buf, 0, n); if (nread < 0) { break; } target.write(buf, 0, nread); } }; conn.setDownloadHandler(handler); update("COPY SELECT * FROM foo INTO 'banana' ON CLIENT"); // go to String instead of byte[] because Strings have handy replace methods. String result = new String(target.toByteArray(), StandardCharsets.UTF_8); // It should contain only \r\n's, no lonely \r's or \n's. String replaced = result.replaceAll("\r\n", "XX"); assertEq("Index of first lonely \\r", -1, replaced.indexOf('\r')); assertEq("Index of first lonely \\n", -1, replaced.indexOf('\n')); String withoutData = result.replaceAll("[0-9]", ""); assertEq("Length after dropping data, modulo 3", 0, withoutData.length() % 3); for (int i = 0; i < withoutData.length(); i += 3) { String sub = withoutData.substring(i, i+3); if (!sub.equals("|\r\n")) { fail(String.format( "At index %d out of %d in the skeleton (=digits removed) we find <%02x %02x %02x> instead of <7c 0d 0a>", i, withoutData.length(), (int)sub.charAt(0), (int)sub.charAt(1), (int)sub.charAt(2))); } } // only to show some successful output if the above succeeds assertEq("Every 3-byte normalized chunk", "|\\r\\n", "|\\r\\n"); exitTest(); } private void test_UploadFromStream() throws SQLException, Failure { initTest("test_UploadFromStream"); prepare(); UploadHandler handler = new UploadHandler() { final String data = "1|one\n2|two\n3|three\n"; @Override public void handleUpload(MonetConnection.Upload handle, String name, boolean textMode, long linesToSkip) throws IOException { // ignoring linesToSkip as it's not used in this test ByteArrayInputStream s = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); handle.uploadFrom(s); } }; conn.setUploadHandler(handler); update("COPY INTO foo FROM 'banana' ON CLIENT"); // connection is still alive assertQueryInt("SELECT i FROM foo WHERE t = 'three'", 3); exitTest(); } private void test_UploadFromReader() throws SQLException, Failure { initTest("test_UploadFromReader"); prepare(); UploadHandler handler = new UploadHandler() { final String data = "1|one\n2|two\n3|three\n"; @Override public void handleUpload(MonetConnection.Upload handle, String name, boolean textMode, long linesToSkip) throws IOException { // ignoring linesToSkip as it's not used in this test StringReader r = new StringReader(data); handle.uploadFrom(r); } }; conn.setUploadHandler(handler); update("COPY INTO foo FROM 'banana' ON CLIENT"); assertQueryInt("SELECT i FROM foo WHERE t = 'three'", 3); exitTest(); } private void test_UploadFromReaderOffset() throws SQLException, Failure { initTest("test_UploadFromReaderOffset"); prepare(); UploadHandler handler = new UploadHandler() { final String data = "1|one\n2|two\n3|three\n"; @Override public void handleUpload(MonetConnection.Upload handle, String name, boolean textMode, long linesToSkip) throws IOException { BufferedReader r = new BufferedReader(new StringReader(data)); handle.uploadFrom(r, linesToSkip); } }; conn.setUploadHandler(handler); update("COPY OFFSET 2 INTO foo FROM 'banana' ON CLIENT"); assertQueryInt("SELECT i FROM foo WHERE t = 'three'", 3); exitTest(); } private void test_FailUploadLate() throws SQLException, Failure { initTest("test_FailUploadLate"); prepare(); MyUploadHandler handler = new MyUploadHandler(100, 50, "i don't like line 50"); conn.setUploadHandler(handler); expectError("COPY INTO foo FROM 'banana' ON CLIENT", "i don't like"); assertEq("cancellation callback called", false, handler.isCancelled()); assertEq("connection is closed", true, conn.isClosed()); exitTest(); } private void test_FailUploadLate2() throws SQLException, Failure { initTest("test_FailUploadLate2"); // Here we send empty lines only, to check if the server detects it properly instead // of simply complaining about an incomplete file. prepare(); UploadHandler handler = new UploadHandler() { @Override public void handleUpload(MonetConnection.Upload handle, String name, boolean textMode, long linesToSkip) throws IOException { // ignoring linesToSkip as it's not used in this test PrintStream stream = handle.getStream(); for (int i = 1; i <= 20_000; i++) stream.println(); stream.flush(); throw new IOException("exception after all"); } }; conn.setUploadHandler(handler); expectError("COPY INTO foo(t) FROM 'banana'(t) ON CLIENT", "after all"); assertEq("connection is closed", true, conn.isClosed()); // Cannot check the server log, but at the time I checked, it said "prematurely stopped client", which is fine. exitTest(); } private void test_FailDownloadLate() throws SQLException, Failure { initTest("test_FailDownloadLate"); prepare(); MyDownloadHandler handler = new MyDownloadHandler(200, "download refused"); conn.setDownloadHandler(handler); update("INSERT INTO foo SELECT value as i, 'number' || value AS t FROM sys.generate_series(0, 100)"); expectError("COPY (SELECT * FROM sys.generate_series(0,200)) INTO 'banana' ON CLIENT", "download refused"); // Exception closes the connection assertEq("connection is closed", conn.isClosed(), true); exitTest(); } private void test_FileTransferHandlerUploadNotCompressed() throws IOException, SQLException, Failure { initTest("FileTransferHandlerUploadNotCompressed"); testFileTransferHandlerUploadCompressed(StandardCharsets.UTF_8, false, 0); exitTest(); } private void test_FileTransferHandlerUploadNotCompressedSkip() throws IOException, SQLException, Failure { initTest("FileTransferHandlerUploadNotCompressedSkip"); testFileTransferHandlerUploadCompressed(StandardCharsets.UTF_8, false, 2); exitTest(); } private void test_FileTransferHandlerUploadCompressed() throws IOException, SQLException, Failure { initTest("FileTransferHandlerUploadCompressed"); testFileTransferHandlerUploadCompressed(StandardCharsets.UTF_8, true, 0); exitTest(); } private void test_FileTransferHandlerUploadCompressedSkip() throws IOException, SQLException, Failure { initTest("FileTransferHandlerUploadCompressedSkip"); testFileTransferHandlerUploadCompressed(StandardCharsets.UTF_8, true, 2); exitTest(); } private void test_FileTransferHandlerUploadUtf8() throws IOException, SQLException, Failure { initTest("test_FileTransferHandlerUploadUtf8"); testFileTransferHandlerUploadEncoding(StandardCharsets.UTF_8, "UTF-8"); exitTest(); } private void test_FileTransferHandlerUploadLatin1() throws IOException, SQLException, Failure { initTest("test_FileTransferHandlerUploadLatin1"); testFileTransferHandlerUploadEncoding(Charset.forName("latin1"), "latin1"); exitTest(); } private void test_FileTransferHandlerUploadNull() throws IOException, SQLException, Failure { initTest("test_FileTransferHandlerUploadNull"); testFileTransferHandlerUploadEncoding(null, Charset.defaultCharset().name()); exitTest(); } private String hexdump(String s) { StringBuilder buf = new StringBuilder(); char[] chars = s.toCharArray(); for (char c: chars) { buf.append(' '); buf.append((int)c); // UnicodeBlock b = UnicodeBlock.of(c); // if (!Character.isISOControl(c) && b != null) { // if (b != UnicodeBlock.HIGH_SURROGATES && b != UnicodeBlock.LOW_SURROGATES && b != UnicodeBlock.SPECIALS) { // buf.append("='"); // buf.append(c); // buf.append("'"); // } // } } return "<" + buf.toString().trim() + ">"; } private void testFileTransferHandlerUploadEncoding(Charset handlerEncoding, String fileEncoding) throws IOException, SQLException, Failure { prepare(); outBuffer.append("Default encoding is " + Charset.defaultCharset().displayName() + "\n"); Path d = getTmpDir(currentTestName); Path f = d.resolve("data.txt"); OutputStream s = Files.newOutputStream(f, CREATE_NEW); PrintStream ps = new PrintStream(s, false, fileEncoding); ps.println("1|one"); ps.println("2|tw??"); ps.println("3|three"); ps.close(); conn.setUploadHandler(new FileTransferHandler(d, handlerEncoding)); update("COPY INTO foo FROM 'data.txt' ON CLIENT"); assertQueryInt("SELECT SUM(i) FROM foo", 6); final String result = queryString("SELECT t FROM foo WHERE i = 2"); String two = "tw??"; // String hexTwo = hexdump(two); String hexResult = hexdump(result); assertEq("query result hexdump", hexTwo, hexResult); // assertEq("query result", two, result); } private void testFileTransferHandlerUploadCompressed(Charset encoding, boolean compressed, int skipLines) throws IOException, SQLException, Failure { prepare(); Path d = getTmpDir(currentTestName); String fileName = "data.txt"; if (compressed) fileName += ".gz"; Path f = d.resolve(fileName); OutputStream s = Files.newOutputStream(f, CREATE_NEW); if (compressed) { s = new GZIPOutputStream(s); } Writer w = new OutputStreamWriter(s, encoding); PrintWriter ps = new PrintWriter(w); String[] words = { "one", "tw??", "three" }; int i = 0; int expectedSum = 0; for (String word: words) { int n = i + 1; ps.println("" + n + "|" + word); if (i >= skipLines) { expectedSum += n; } i += 1; } ps.close(); conn.setUploadHandler(new FileTransferHandler(d, encoding)); String query = "COPY OFFSET " + (skipLines + 1) + " INTO foo FROM '" + fileName + "' ON CLIENT"; update(query); assertQueryInt("SELECT SUM(i) FROM foo", expectedSum); } private void test_FileTransferHandlerUploadRefused() throws IOException, SQLException, Failure { initTest("test_FileTransferHandlerUploadRefused"); prepare(); Path d = getTmpDir(currentTestName); Path f = d.resolve("data.txt"); OutputStream s = Files.newOutputStream(f, CREATE_NEW); PrintStream ps = new PrintStream(s, false, "UTF-8"); ps.println("1|one"); ps.println("2|two"); ps.println("3|three"); ps.close(); Path d2 = getTmpDir(currentTestName + "2"); conn.setUploadHandler(new FileTransferHandler(d2, StandardCharsets.UTF_8)); String quoted = f.toAbsolutePath().toString().replaceAll("'", "''"); expectError("COPY INTO foo FROM R'"+ quoted + "' ON CLIENT", "not in upload directory"); // connection is still alive assertQueryInt("SELECT SUM(i) FROM foo", 0); exitTest(); } private void test_FileTransferHandlerDownloadUtf8() throws SQLException, Failure, IOException { initTest("test_FileTransferHandlerDownloadUtf8"); testFileTransferHandlerDownload(StandardCharsets.UTF_8, StandardCharsets.UTF_8); exitTest(); } private void test_FileTransferHandlerDownloadLatin1() throws SQLException, Failure, IOException { initTest("test_FileTransferHandlerDownloadLatin1"); Charset latin1 = Charset.forName("latin1"); testFileTransferHandlerDownload(latin1, latin1); exitTest(); } private void test_FileTransferHandlerDownloadNull() throws SQLException, Failure, IOException { initTest("test_FileTransferHandlerDownloadNull"); testFileTransferHandlerDownload(null, Charset.defaultCharset()); exitTest(); } private void testFileTransferHandlerDownload(Charset handlerEncoding, Charset fileEncoding) throws SQLException, Failure, IOException { prepare(); update("INSERT INTO foo VALUES (42, 'forty-tw??')"); Path d = getTmpDir(currentTestName); conn.setDownloadHandler(new FileTransferHandler(d, handlerEncoding)); update("COPY SELECT * FROM foo INTO 'data.txt' ON CLIENT"); List<String> lines = Files.readAllLines(d.resolve("data.txt"), fileEncoding); assertEq("lines written", lines.size(), 1); assertEq("line content", lines.get(0), "42|\"forty-tw??\""); // connection is still alive assertQueryInt("SELECT SUM(i) FROM foo", 42); } private void test_FileTransferHandlerDownloadRefused() throws SQLException, Failure, IOException { initTest("test_FileTransferHandlerDownloadRefused"); prepare(); BugFixLevel level = getLevel(); update("INSERT INTO foo VALUES (42, 'forty-two')"); Path d = getTmpDir(currentTestName); Path d2 = getTmpDir(currentTestName + "2"); conn.setDownloadHandler(new FileTransferHandler(d2, StandardCharsets.UTF_8)); String quoted = d.resolve("data.txt").toAbsolutePath().toString().replaceAll("'", "''"); expectError("COPY SELECT * FROM foo INTO R'" + quoted + "' ON CLIENT", "not in download directory"); if (level.compareTo(BugFixLevel.CanRefuseDownload) >= 0) { // connection is still alive assertQueryInt("SELECT SUM(i) FROM foo", 42); } exitTest(); } /* utility methods */ private void say(String message) throws Failure { outBuffer.append(message).append("\n"); throw new Failure(message); } private void fail(String message) throws Failure { outBuffer.append("FAILURE: ").append(message).append("\n"); throw new Failure(message); } private void checked(String quantity, Object actual) { outBuffer.append(" CHECKED: <").append(quantity).append("> is ").append(actual).append(" as expected").append("\n"); } private void assertEq(String quantity, Object expected, Object actual) throws Failure { if (expected.equals(actual)) { checked(quantity, actual); } else { fail("Expected <" + quantity + "> to be " + expected + " got " + actual); } } private boolean execute(String query) throws SQLException { outBuffer.append("EXECUTE: ").append(query).append("\n"); final boolean result = stmt.execute(query); outBuffer.append(" OK"); if (!result) { outBuffer.append(", updated ").append(stmt.getUpdateCount()).append(" rows"); } outBuffer.append("\n"); return result; } private void update(String query) throws SQLException, Failure { execute(query); } private void expectError(String query, String expectedError) throws SQLException { try { execute(query); } catch (SQLException e) { String msg = e.getMessage(); if (msg.contains(expectedError)) { outBuffer.append(" GOT EXPECTED EXCEPTION: ").append(msg).append("\n"); } else { throw e; } } } private void assertQueryInt(String query, int expected) throws SQLException, Failure { if (execute(query) == false) { fail("Query does not return a result set"); } final ResultSet rs = stmt.getResultSet(); assertEq("column count", 1, rs.getMetaData().getColumnCount()); if (!rs.next()) { rs.close(); fail("Result set is empty"); } final int result = rs.getInt(1); if (rs.next()) { rs.close(); fail("Result set has more than one row"); } rs.close(); checked("row count", 1); assertEq("query result", expected, result); } private void assertQueryString(String query, String expected) throws SQLException, Failure { final String result = queryString(query); assertEq("query result", expected, result); } private String queryString(String query) throws SQLException, Failure { if (execute(query) == false) { fail("Query does not return a result set"); } final ResultSet rs = stmt.getResultSet(); assertEq("column count", 1, rs.getMetaData().getColumnCount()); if (!rs.next()) { rs.close(); fail("Result set is empty"); } final String result = rs.getString(1); if (rs.next()) { rs.close(); fail("Result set has more than one row"); } rs.close(); checked("row count", 1); return result; } private synchronized Path getTmpDir(String name) throws IOException { if (tmpDir == null) { tmpDir = Files.createTempDirectory("testMonetDB"); Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { Files.walkFileTree(tmpDir, new SimpleFileVisitor<Path>() { @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { Files.delete(file); return FileVisitResult.CONTINUE; } @Override public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { Files.delete(dir); return FileVisitResult.CONTINUE; } }); } catch (IOException e) { // we do this on a best effort basis } })); } final Path p = tmpDir.resolve(name); Files.createDirectory(p); return p; } /** * Implementation of an UploadHandler */ static class MyUploadHandler implements UploadHandler { private final long rows; private final long errorAt; private final String errorMessage; private boolean encounteredWriteError = false; private boolean cancelled = false; private int chunkSize = 100; // small number to trigger more bugs MyUploadHandler(long rows, long errorAt, String errorMessage) { this.rows = rows; this.errorAt = errorAt; this.errorMessage = errorMessage; } MyUploadHandler(long rows) { this(rows, -1, null); } MyUploadHandler(String errorMessage) { this(0, -1, errorMessage); } public void setChunkSize(int chunkSize) { this.chunkSize = chunkSize; } @Override public void uploadCancelled() { cancelled = true; } @Override public void handleUpload(MonetConnection.Upload handle, String name, boolean textMode, long linesToSkip) throws IOException { if (errorAt == -1 && errorMessage != null) { handle.sendError(errorMessage); return; } handle.setChunkSize(chunkSize); PrintStream stream = handle.getStream(); for (long i = linesToSkip; i < rows; i++) { if (i == errorAt) { throw new IOException(errorMessage); } stream.printf("%d|%d%n", i + 1, i + 1); if (i % 25 == 0 && stream.checkError()) { encounteredWriteError = true; break; } } } public Object encounteredWriteError() { return encounteredWriteError; } public boolean isCancelled() { return cancelled; } } /** * Implementation of a DownloadHandler */ static class MyDownloadHandler implements DownloadHandler { private final int errorAtByte; private final String errorMessage; private int attempts = 0; private int bytesSeen = 0; private int lineEndingsSeen = 0; private int startOfLine = 0; MyDownloadHandler(int errorAtByte, String errorMessage) { this.errorAtByte = errorAtByte; this.errorMessage = errorMessage; } MyDownloadHandler(String errorMessage) { this(-1, errorMessage); } MyDownloadHandler() { this(-1, null); } @Override public void handleDownload(MonetConnection.Download handle, String name, boolean textMode) throws IOException { attempts++; bytesSeen = 0; lineEndingsSeen = 0; startOfLine = 0; if (errorMessage != null && errorAtByte < 0) { handle.sendError(errorMessage); return; } InputStream stream = handle.getStream(); byte[] buffer = new byte[1024]; while (true) { int toRead = buffer.length; if (errorMessage != null && errorAtByte >= 0) { if (bytesSeen == errorAtByte) { throw new IOException(errorMessage); } toRead = Integer.min(toRead, errorAtByte - bytesSeen); } int nread = stream.read(buffer, 0, toRead); if (nread < 0) break; for (int i = 0; i < nread; i++) { if (buffer[i] == '\n') { lineEndingsSeen += 1; startOfLine = bytesSeen + i + 1; } } bytesSeen += nread; } } public int countAttempts() { return attempts; } public int lineCount() { int lines = lineEndingsSeen; if (startOfLine != bytesSeen) lines++; return lines; } } static class Failure extends Exception { static final long serialVersionUID = 3387516993124229948L; public Failure(String message) { super(message); } public Failure(String message, Throwable cause) { super(message, cause); } } }