Re: Parallel batch insert failure in MonetDB through JDBC
Hi, I am trying to insert data in multiple tables in parallel through prepared statements batch execution. I am facing a problem , first thread that executes the batch makes other executions to crash because of the underlying exception - prepared statement is no longer available. Is there a limitation of updating schema in parallel jobs ? If not then what may be the cause of this problem ? Any light on how I may be able to implement this will be helpful.
code that executes batches
package monet.test;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class BatchTest {
public static class TestRunner implements Runnable {
private final static String problemHere =
"jdbc:monetdb://localhost/test-db";
static {
try {
Class.forName("nl.cwi.monetdb.jdbc.MonetDriver");
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private static Connection createConnection() throws SQLException {
// happens one time per thread
return DriverManager.getConnection(problemHere, "monetdb", "monetdb");
}
private static int fieldCount = 10;
private static int[] colType = new int[fieldCount];
private static final int[] types = { Types.VARCHAR, Types.TIMESTAMP,
Types.DECIMAL };
static {
Random random = new Random();
// initialize column types
for (int i = 0; i < fieldCount; i++) {
colType[i] = types[random.nextInt(types.length)];
}
}
private final String name;
private Connection con;
private int batchCount;
private int batchSize;
private String create;
private String drop;
private String insert;
public TestRunner(String string, int bs, int bc) throws SQLException {
this.name = string;
this.con = createConnection();
this.batchCount = bc;
this.batchSize = bs;
this.create = "create table " + name + " (";
for (int i = 0; i < fieldCount; i++) {
create += (i == 0 ? "" : ",") + "col" + i + " " + getType(colType[i]);
}
this.create += ")";
this.insert = "insert into " + name + " values (";
for (int i = 0; i < fieldCount; i++) {
insert += (i == 0 ? "" : ",") + "?";
}
this.insert += ")";
this.drop = "drop table " + name;
}
private static String getType(int i) {
switch (i) {
case Types.DECIMAL:
return "decimal(18,9)";
case Types.VARCHAR:
return "varchar(30000)";
case Types.TIMESTAMP:
return "timestamp";
}
return null;
}
protected void finalize() throws Throwable {
if (con != null) {
con.close();
}
};
public void run() {
System.out.format("%s started.%ncreate: %s%ninsert: %s%ndrop: %s%n",
name, create, insert, drop);
try (Statement stmt = con.createStatement()) {
// this will throw the exception of concurrency con
synchronized (problemHere) {
System.out.format("%d created %s%n", stmt.executeUpdate(create), name);
if (!con.getAutoCommit()) {
con.commit();
}
}
} catch (SQLException e1) {
e1.printStackTrace();
return;
}
try (PreparedStatement stmt = con.prepareStatement(insert)) {
while (batchCount-- > 0) {
for (int i = batchSize; i > 0; i--) {
setBatchInput(stmt);
stmt.addBatch();
}
System.out.format("%s - submitting batch ...%n", name);
stmt.executeBatch();
if (!con.getAutoCommit()) {
con.commit();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (con != null) {
try (Statement stmt = con.createStatement()) {
synchronized (problemHere) {
System.out.format("%d deleted - %s%n", stmt.executeUpdate(drop), name);
if (!con.getAutoCommit()) {
con.commit();
}
}
} catch (SQLException e1) {
e1.printStackTrace();
}
try {
con.close();
con = null;
} catch (SQLException e) {
e.printStackTrace();
}
}
}
System.out.format("%s finished.", name);
}
private static void setBatchInput(PreparedStatement stmt) throws
SQLException {
for (int i = 1; i <= fieldCount; i++) {
stmt.setObject(i, getRandomFieldValue(colType[i - 1]));
}
}
private static Object getRandomFieldValue(int type) {
switch (type) {
case Types.DECIMAL:
return 0;
case Types.VARCHAR:
return "null";
case Types.TIMESTAMP:
return new Timestamp(System.currentTimeMillis());
}
return null;
}
}
public static void main(String[] args) {
int num = 2;
List<Thread> ts = new ArrayList<>();
while (num-- > 0) {
Thread t = null;
try {
(t = new Thread(new TestRunner("runner_" + num, 10000, 1))).start();
} catch (SQLException e) {
e.printStackTrace();
}
ts.add(t);
}
for (Thread t : ts) {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
On Wed, Nov 4, 2015 at 3:20 PM, Siddharth Tyagi
Hi, I am trying to insert data in multiple tables in parallel through prepared statements batch execution. I am facing a problem ,
first thread that executes the batch makes other executions to crash because of the underlying exception - prepared statement is no longer available.
Is there a limitation of updating schema in parallel jobs ? If not then what may be the cause of this problem ?
Any light on how I may be able to implement this will be helpful.
code that executes batches
package monet.test;
import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Statement; import java.sql.Timestamp; import java.sql.Types; import java.util.ArrayList; import java.util.List; import java.util.Random;
public class BatchTest { public static class TestRunner implements Runnable { private final static String problemHere = "jdbc:monetdb://localhost/test-db";
static { try { Class.forName("nl.cwi.monetdb.jdbc.MonetDriver"); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
private static Connection createConnection() throws SQLException { // happens one time per thread return DriverManager.getConnection(problemHere, "monetdb", "monetdb"); }
private static int fieldCount = 10; private static int[] colType = new int[fieldCount]; private static final int[] types = { Types.VARCHAR, Types.TIMESTAMP, Types.DECIMAL };
static { Random random = new Random(); // initialize column types for (int i = 0; i < fieldCount; i++) { colType[i] = types[random.nextInt(types.length)]; } }
private final String name; private Connection con; private int batchCount; private int batchSize; private String create; private String drop; private String insert;
public TestRunner(String string, int bs, int bc) throws SQLException { this.name = string; this.con = createConnection(); this.batchCount = bc; this.batchSize = bs; this.create = "create table " + name + " (";
for (int i = 0; i < fieldCount; i++) { create += (i == 0 ? "" : ",") + "col" + i + " " + getType(colType[i]); } this.create += ")"; this.insert = "insert into " + name + " values ("; for (int i = 0; i < fieldCount; i++) { insert += (i == 0 ? "" : ",") + "?"; } this.insert += ")"; this.drop = "drop table " + name; }
private static String getType(int i) { switch (i) { case Types.DECIMAL: return "decimal(18,9)"; case Types.VARCHAR: return "varchar(30000)"; case Types.TIMESTAMP: return "timestamp"; } return null; }
protected void finalize() throws Throwable { if (con != null) { con.close(); } };
public void run() { System.out.format("%s started.%ncreate: %s%ninsert: %s%ndrop: %s%n", name, create, insert, drop); try (Statement stmt = con.createStatement()) { // this will throw the exception of concurrency con synchronized (problemHere) { System.out.format("%d created %s%n", stmt.executeUpdate(create), name); if (!con.getAutoCommit()) { con.commit(); } } } catch (SQLException e1) { e1.printStackTrace(); return; } try (PreparedStatement stmt = con.prepareStatement(insert)) { while (batchCount-- > 0) { for (int i = batchSize; i > 0; i--) { setBatchInput(stmt); stmt.addBatch(); } System.out.format("%s - submitting batch ...%n", name); stmt.executeBatch(); if (!con.getAutoCommit()) { con.commit(); } } } catch (Exception e) { e.printStackTrace(); } finally { if (con != null) { try (Statement stmt = con.createStatement()) { synchronized (problemHere) { System.out.format("%d deleted - %s%n", stmt.executeUpdate(drop), name); if (!con.getAutoCommit()) { con.commit(); } } } catch (SQLException e1) { e1.printStackTrace(); } try { con.close(); con = null; } catch (SQLException e) { e.printStackTrace(); } } } System.out.format("%s finished.", name); }
private static void setBatchInput(PreparedStatement stmt) throws SQLException { for (int i = 1; i <= fieldCount; i++) { stmt.setObject(i, getRandomFieldValue(colType[i - 1])); } }
private static Object getRandomFieldValue(int type) { switch (type) { case Types.DECIMAL: return 0; case Types.VARCHAR: return "null"; case Types.TIMESTAMP: return new Timestamp(System.currentTimeMillis()); } return null; } }
public static void main(String[] args) { int num = 2; List<Thread> ts = new ArrayList<>(); while (num-- > 0) { Thread t = null; try { (t = new Thread(new TestRunner("runner_" + num, 10000, 1))).start(); } catch (SQLException e) { e.printStackTrace(); } ts.add(t); } for (Thread t : ts) { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } } }
}
--
*Thanks and RegardsSiddharth Tyagi*
-- *Thanks and RegardsSiddharth Tyagi*
participants (1)
-
Siddharth Tyagi