Hi,
I am trying to insert data in multiple tables in parallel through prepared
statements batch execution.
I am facing a problem ,
first thread that executes the batch makes other executions to crash
because of the underlying exception - prepared statement is no longer
available.
Is there a limitation of updating schema in parallel jobs ?
If not then what may be the cause of this problem ?
Any light on how I may be able to implement this will be helpful.
>> code that executes batches
package monet.test;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class BatchTest {
public static class TestRunner implements Runnable {
private final static String problemHere =
"jdbc:monetdb://localhost/test-db";
static {
try {
Class.forName("nl.cwi.monetdb.jdbc.MonetDriver");
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private static Connection createConnection() throws SQLException {
// happens one time per thread
return DriverManager.getConnection(problemHere, "monetdb", "monetdb");
}
private static int fieldCount = 10;
private static int[] colType = new int[fieldCount];
private static final int[] types = { Types.VARCHAR, Types.TIMESTAMP,
Types.DECIMAL };
static {
Random random = new Random();
// initialize column types
for (int i = 0; i < fieldCount; i++) {
colType[i] = types[random.nextInt(types.length)];
}
}
private final String name;
private Connection con;
private int batchCount;
private int batchSize;
private String create;
private String drop;
private String insert;
public TestRunner(String string, int bs, int bc) throws SQLException {
this.name = string;
this.con = createConnection();
this.batchCount = bc;
this.batchSize = bs;
this.create = "create table " + name + " (";
for (int i = 0; i < fieldCount; i++) {
create += (i == 0 ? "" : ",") + "col" + i + " " + getType(colType[i]);
}
this.create += ")";
this.insert = "insert into " + name + " values (";
for (int i = 0; i < fieldCount; i++) {
insert += (i == 0 ? "" : ",") + "?";
}
this.insert += ")";
this.drop = "drop table " + name;
}
private static String getType(int i) {
switch (i) {
case Types.DECIMAL:
return "decimal(18,9)";
case Types.VARCHAR:
return "varchar(30000)";
case Types.TIMESTAMP:
return "timestamp";
}
return null;
}
protected void finalize() throws Throwable {
if (con != null) {
con.close();
}
};
public void run() {
System.out.format("%s started.%ncreate: %s%ninsert: %s%ndrop: %s%n",
name, create, insert, drop);
try (Statement stmt = con.createStatement()) {
// this will throw the exception of concurrency con
synchronized (problemHere) {
System.out.format("%d created %s%n", stmt.executeUpdate(create), name);
if (!con.getAutoCommit()) {
con.commit();
}
}
} catch (SQLException e1) {
e1.printStackTrace();
return;
}
try (PreparedStatement stmt = con.prepareStatement(insert)) {
while (batchCount-- > 0) {
for (int i = batchSize; i > 0; i--) {
setBatchInput(stmt);
stmt.addBatch();
}
System.out.format("%s - submitting batch ...%n", name);
stmt.executeBatch();
if (!con.getAutoCommit()) {
con.commit();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (con != null) {
try (Statement stmt = con.createStatement()) {
synchronized (problemHere) {
System.out.format("%d deleted - %s%n", stmt.executeUpdate(drop), name);
if (!con.getAutoCommit()) {
con.commit();
}
}
} catch (SQLException e1) {
e1.printStackTrace();
}
try {
con.close();
con = null;
} catch (SQLException e) {
e.printStackTrace();
}
}
}
System.out.format("%s finished.", name);
}
private static void setBatchInput(PreparedStatement stmt) throws
SQLException {
for (int i = 1; i <= fieldCount; i++) {
stmt.setObject(i, getRandomFieldValue(colType[i - 1]));
}
}
private static Object getRandomFieldValue(int type) {
switch (type) {
case Types.DECIMAL:
return 0;
case Types.VARCHAR:
return "null";
case Types.TIMESTAMP:
return new Timestamp(System.currentTimeMillis());
}
return null;
}
}
public static void main(String[] args) {
int num = 2;
List<Thread> ts = new ArrayList<>();
while (num-- > 0) {
Thread t = null;
try {
(t = new Thread(new TestRunner("runner_" + num, 10000, 1))).start();
} catch (SQLException e) {
e.printStackTrace();
}
ts.add(t);
}
for (Thread t : ts) {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
On Wed, Nov 4, 2015 at 3:20 PM, Siddharth Tyagi <inbox.siddharth(a)gmail.com>
wrote:
> Hi,
> I am trying to insert data in multiple tables in parallel through prepared
> statements batch execution.
> I am facing a problem ,
>
> first thread that executes the batch makes other executions to crash
> because of the underlying exception - prepared statement is no longer
> available.
>
> Is there a limitation of updating schema in parallel jobs ?
> If not then what may be the cause of this problem ?
>
> Any light on how I may be able to implement this will be helpful.
>
> >> code that executes batches
>
> package monet.test;
>
> import java.sql.Connection;
> import java.sql.DriverManager;
> import java.sql.PreparedStatement;
> import java.sql.SQLException;
> import java.sql.Statement;
> import java.sql.Timestamp;
> import java.sql.Types;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.Random;
>
> public class BatchTest {
> public static class TestRunner implements Runnable {
> private final static String problemHere =
> "jdbc:monetdb://localhost/test-db";
>
> static {
> try {
> Class.forName("nl.cwi.monetdb.jdbc.MonetDriver");
> } catch (ClassNotFoundException e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> }
>
> private static Connection createConnection() throws SQLException {
> // happens one time per thread
> return DriverManager.getConnection(problemHere, "monetdb", "monetdb");
> }
>
> private static int fieldCount = 10;
> private static int[] colType = new int[fieldCount];
> private static final int[] types = { Types.VARCHAR, Types.TIMESTAMP,
> Types.DECIMAL };
>
> static {
> Random random = new Random();
> // initialize column types
> for (int i = 0; i < fieldCount; i++) {
> colType[i] = types[random.nextInt(types.length)];
> }
> }
>
> private final String name;
> private Connection con;
> private int batchCount;
> private int batchSize;
> private String create;
> private String drop;
> private String insert;
>
> public TestRunner(String string, int bs, int bc) throws SQLException {
> this.name = string;
> this.con = createConnection();
> this.batchCount = bc;
> this.batchSize = bs;
> this.create = "create table " + name + " (";
>
> for (int i = 0; i < fieldCount; i++) {
> create += (i == 0 ? "" : ",") + "col" + i + " " + getType(colType[i]);
> }
> this.create += ")";
> this.insert = "insert into " + name + " values (";
> for (int i = 0; i < fieldCount; i++) {
> insert += (i == 0 ? "" : ",") + "?";
> }
> this.insert += ")";
> this.drop = "drop table " + name;
> }
>
> private static String getType(int i) {
> switch (i) {
> case Types.DECIMAL:
> return "decimal(18,9)";
> case Types.VARCHAR:
> return "varchar(30000)";
> case Types.TIMESTAMP:
> return "timestamp";
> }
> return null;
> }
>
> protected void finalize() throws Throwable {
> if (con != null) {
> con.close();
> }
> };
>
> public void run() {
> System.out.format("%s started.%ncreate: %s%ninsert: %s%ndrop: %s%n",
> name, create, insert, drop);
> try (Statement stmt = con.createStatement()) {
> // this will throw the exception of concurrency con
> synchronized (problemHere) {
> System.out.format("%d created %s%n", stmt.executeUpdate(create), name);
> if (!con.getAutoCommit()) {
> con.commit();
> }
> }
> } catch (SQLException e1) {
> e1.printStackTrace();
> return;
> }
> try (PreparedStatement stmt = con.prepareStatement(insert)) {
> while (batchCount-- > 0) {
> for (int i = batchSize; i > 0; i--) {
> setBatchInput(stmt);
> stmt.addBatch();
> }
> System.out.format("%s - submitting batch ...%n", name);
> stmt.executeBatch();
> if (!con.getAutoCommit()) {
> con.commit();
> }
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> if (con != null) {
> try (Statement stmt = con.createStatement()) {
> synchronized (problemHere) {
> System.out.format("%d deleted - %s%n", stmt.executeUpdate(drop), name);
> if (!con.getAutoCommit()) {
> con.commit();
> }
> }
> } catch (SQLException e1) {
> e1.printStackTrace();
> }
> try {
> con.close();
> con = null;
> } catch (SQLException e) {
> e.printStackTrace();
> }
> }
> }
> System.out.format("%s finished.", name);
> }
>
> private static void setBatchInput(PreparedStatement stmt) throws
> SQLException {
> for (int i = 1; i <= fieldCount; i++) {
> stmt.setObject(i, getRandomFieldValue(colType[i - 1]));
> }
> }
>
> private static Object getRandomFieldValue(int type) {
> switch (type) {
> case Types.DECIMAL:
> return 0;
> case Types.VARCHAR:
> return "null";
> case Types.TIMESTAMP:
> return new Timestamp(System.currentTimeMillis());
> }
> return null;
> }
> }
>
> public static void main(String[] args) {
> int num = 2;
> List<Thread> ts = new ArrayList<>();
> while (num-- > 0) {
> Thread t = null;
> try {
> (t = new Thread(new TestRunner("runner_" + num, 10000, 1))).start();
> } catch (SQLException e) {
> e.printStackTrace();
> }
> ts.add(t);
> }
> for (Thread t : ts) {
> try {
> t.join();
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
> }
> }
>
> }
>
> --
>
>
> *Thanks and RegardsSiddharth Tyagi*
>
--
*Thanks and RegardsSiddharth Tyagi*