I am trying to insert data in multiple tables in parallel through prepared statements batch execution.
first thread that executes the batch makes other executions to crash because of the underlying exception - prepared statement is no longer available.
Any light on how I may be able to implement this will be helpful.
package monet.test;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class BatchTest {
public static class TestRunner implements Runnable {
private final static String problemHere = "jdbc:monetdb://localhost/test-db";
static {
try {
Class.forName("nl.cwi.monetdb.jdbc.MonetDriver");
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private static Connection createConnection() throws SQLException {
// happens one time per thread
return DriverManager.getConnection(problemHere, "monetdb", "monetdb");
}
private static int fieldCount = 10;
private static int[] colType = new int[fieldCount];
private static final int[] types = { Types.VARCHAR, Types.TIMESTAMP, Types.DECIMAL };
static {
Random random = new Random();
// initialize column types
for (int i = 0; i < fieldCount; i++) {
colType[i] = types[random.nextInt(types.length)];
}
}
private final String name;
private Connection con;
private int batchCount;
private int batchSize;
private String create;
private String drop;
private String insert;
public TestRunner(String string, int bs, int bc) throws SQLException {
this.con = createConnection();
this.batchCount = bc;
this.batchSize = bs;
this.create = "create table " + name + " (";
for (int i = 0; i < fieldCount; i++) {
create += (i == 0 ? "" : ",") + "col" + i + " " + getType(colType[i]);
}
this.create += ")";
this.insert = "insert into " + name + " values (";
for (int i = 0; i < fieldCount; i++) {
insert += (i == 0 ? "" : ",") + "?";
}
this.insert += ")";
this.drop = "drop table " + name;
}
private static String getType(int i) {
switch (i) {
case Types.DECIMAL:
return "decimal(18,9)";
case Types.VARCHAR:
return "varchar(30000)";
case Types.TIMESTAMP:
return "timestamp";
}
return null;
}
protected void finalize() throws Throwable {
if (con != null) {
con.close();
}
};
public void run() {
System.out.format("%s started.%ncreate: %s%ninsert: %s%ndrop: %s%n", name, create, insert, drop);
try (Statement stmt = con.createStatement()) {
// this will throw the exception of concurrency con
synchronized (problemHere) {
System.out.format("%d created %s%n", stmt.executeUpdate(create), name);
if (!con.getAutoCommit()) {
con.commit();
}
}
} catch (SQLException e1) {
e1.printStackTrace();
return;
}
try (PreparedStatement stmt = con.prepareStatement(insert)) {
while (batchCount-- > 0) {
for (int i = batchSize; i > 0; i--) {
setBatchInput(stmt);
stmt.addBatch();
}
System.out.format("%s - submitting batch ...%n", name);
stmt.executeBatch();
if (!con.getAutoCommit()) {
con.commit();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (con != null) {
try (Statement stmt = con.createStatement()) {
synchronized (problemHere) {
System.out.format("%d deleted - %s%n", stmt.executeUpdate(drop), name);
if (!con.getAutoCommit()) {
con.commit();
}
}
} catch (SQLException e1) {
e1.printStackTrace();
}
try {
con.close();
con = null;
} catch (SQLException e) {
e.printStackTrace();
}
}
}
System.out.format("%s finished.", name);
}
private static void setBatchInput(PreparedStatement stmt) throws SQLException {
for (int i = 1; i <= fieldCount; i++) {
stmt.setObject(i, getRandomFieldValue(colType[i - 1]));
}
}
private static Object getRandomFieldValue(int type) {
switch (type) {
case Types.DECIMAL:
return 0;
case Types.VARCHAR:
return "null";
case Types.TIMESTAMP:
return new Timestamp(System.currentTimeMillis());
}
return null;
}
}
public static void main(String[] args) {
int num = 2;
List<Thread> ts = new ArrayList<>();
while (num-- > 0) {
Thread t = null;
try {
(t = new Thread(new TestRunner("runner_" + num, 10000, 1))).start();
} catch (SQLException e) {
e.printStackTrace();
}
ts.add(t);
}
for (Thread t : ts) {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}