Hi,
I am trying to insert data in multiple tables in parallel through prepared statements batch execution.
I am facing a problem ,

first thread that executes the batch makes other executions to crash because of the underlying exception - prepared statement is no longer available.

Is there a limitation of updating schema in parallel jobs ?
If not then what may be the cause of this problem ? 

Any light on how I may be able to implement this will be helpful.

>> code that executes batches

package monet.test;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class BatchTest {
    public static class TestRunner implements Runnable {
private final static String problemHere = "jdbc:monetdb://localhost/test-db";

static {
    try {
Class.forName("nl.cwi.monetdb.jdbc.MonetDriver");
    } catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
    }
}

private static Connection createConnection() throws SQLException {
    // happens one time per thread
    return DriverManager.getConnection(problemHere, "monetdb", "monetdb");
}

private static int fieldCount = 10;
private static int[] colType = new int[fieldCount];
private static final int[] types = { Types.VARCHAR, Types.TIMESTAMP, Types.DECIMAL };

static {
    Random random = new Random();
    // initialize column types
    for (int i = 0; i < fieldCount; i++) {
colType[i] = types[random.nextInt(types.length)];
    }
}

private final String name;
private Connection con;
private int batchCount;
private int batchSize;
private String create;
private String drop;
private String insert;

public TestRunner(String string, int bs, int bc) throws SQLException {
    this.name = string;
    this.con = createConnection();
    this.batchCount = bc;
    this.batchSize = bs;
    this.create = "create table " + name + " (";

    for (int i = 0; i < fieldCount; i++) {
create += (i == 0 ? "" : ",") + "col" + i + " " + getType(colType[i]);
    }
    this.create += ")";
    this.insert = "insert into " + name + " values (";
    for (int i = 0; i < fieldCount; i++) {
insert += (i == 0 ? "" : ",") + "?";
    }
    this.insert += ")";
    this.drop = "drop table " + name;
}

private static String getType(int i) {
    switch (i) {
    case Types.DECIMAL:
return "decimal(18,9)";
    case Types.VARCHAR:
return "varchar(30000)";
    case Types.TIMESTAMP:
return "timestamp";
    }
    return null;
}

protected void finalize() throws Throwable {
    if (con != null) {
con.close();
    }
};

public void run() {
    System.out.format("%s started.%ncreate: %s%ninsert: %s%ndrop: %s%n", name, create, insert, drop);
    try (Statement stmt = con.createStatement()) {
// this will throw the exception of concurrency con
synchronized (problemHere) {
    System.out.format("%d created %s%n", stmt.executeUpdate(create), name);
    if (!con.getAutoCommit()) {
con.commit();
    }
}
    } catch (SQLException e1) {
e1.printStackTrace();
return;
    }
    try (PreparedStatement stmt = con.prepareStatement(insert)) {
while (batchCount-- > 0) {
    for (int i = batchSize; i > 0; i--) {
setBatchInput(stmt);
stmt.addBatch();
    }
    System.out.format("%s - submitting batch ...%n", name);
    stmt.executeBatch();
    if (!con.getAutoCommit()) {
con.commit();
    }
}
    } catch (Exception e) {
e.printStackTrace();
    } finally {
if (con != null) {
    try (Statement stmt = con.createStatement()) {
synchronized (problemHere) {
    System.out.format("%d deleted - %s%n", stmt.executeUpdate(drop), name);
    if (!con.getAutoCommit()) {
con.commit();
    }
}
    } catch (SQLException e1) {
e1.printStackTrace();
    }
    try {
con.close();
con = null;
    } catch (SQLException e) {
e.printStackTrace();
    }
}
    }
    System.out.format("%s finished.", name);
}

private static void setBatchInput(PreparedStatement stmt) throws SQLException {
    for (int i = 1; i <= fieldCount; i++) {
stmt.setObject(i, getRandomFieldValue(colType[i - 1]));
    }
}

private static Object getRandomFieldValue(int type) {
    switch (type) {
    case Types.DECIMAL:
return 0;
    case Types.VARCHAR:
return "null";
    case Types.TIMESTAMP:
return new Timestamp(System.currentTimeMillis());
    }
    return null;
}
    }

    public static void main(String[] args) {
int num = 2;
List<Thread> ts = new ArrayList<>();
while (num-- > 0) {
    Thread t = null;
    try {
(t = new Thread(new TestRunner("runner_" + num, 10000, 1))).start();
    } catch (SQLException e) {
e.printStackTrace();
    }
    ts.add(t);
}
for (Thread t : ts) {
    try {
t.join();
    } catch (InterruptedException e) {
e.printStackTrace();
    }
}
    }

}

On Wed, Nov 4, 2015 at 3:20 PM, Siddharth Tyagi <inbox.siddharth@gmail.com> wrote:
Hi,
I am trying to insert data in multiple tables in parallel through prepared statements batch execution.
I am facing a problem ,

first thread that executes the batch makes other executions to crash because of the underlying exception - prepared statement is no longer available.

Is there a limitation of updating schema in parallel jobs ?
If not then what may be the cause of this problem ? 

Any light on how I may be able to implement this will be helpful.

>> code that executes batches

package monet.test;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class BatchTest {
    public static class TestRunner implements Runnable {
private final static String problemHere = "jdbc:monetdb://localhost/test-db";

static {
   try {
Class.forName("nl.cwi.monetdb.jdbc.MonetDriver");
   } catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
   }
}

private static Connection createConnection() throws SQLException {
   // happens one time per thread
   return DriverManager.getConnection(problemHere, "monetdb", "monetdb");
}

private static int fieldCount = 10;
private static int[] colType = new int[fieldCount];
private static final int[] types = { Types.VARCHAR, Types.TIMESTAMP, Types.DECIMAL };

static {
   Random random = new Random();
   // initialize column types
   for (int i = 0; i < fieldCount; i++) {
colType[i] = types[random.nextInt(types.length)];
   }
}

private final String name;
private Connection con;
private int batchCount;
private int batchSize;
private String create;
private String drop;
private String insert;

public TestRunner(String string, int bs, int bc) throws SQLException {
   this.name = string;
   this.con = createConnection();
   this.batchCount = bc;
   this.batchSize = bs;
   this.create = "create table " + name + " (";

   for (int i = 0; i < fieldCount; i++) {
create += (i == 0 ? "" : ",") + "col" + i + " " + getType(colType[i]);
   }
   this.create += ")";
   this.insert = "insert into " + name + " values (";
   for (int i = 0; i < fieldCount; i++) {
insert += (i == 0 ? "" : ",") + "?";
   }
   this.insert += ")";
   this.drop = "drop table " + name;
}

private static String getType(int i) {
   switch (i) {
   case Types.DECIMAL:
return "decimal(18,9)";
   case Types.VARCHAR:
return "varchar(30000)";
   case Types.TIMESTAMP:
return "timestamp";
   }
   return null;
}

protected void finalize() throws Throwable {
   if (con != null) {
con.close();
   }
};

public void run() {
   System.out.format("%s started.%ncreate: %s%ninsert: %s%ndrop: %s%n", name, create, insert, drop);
   try (Statement stmt = con.createStatement()) {
// this will throw the exception of concurrency con
synchronized (problemHere) {
   System.out.format("%d created %s%n", stmt.executeUpdate(create), name);
   if (!con.getAutoCommit()) {
con.commit();
   }
}
   } catch (SQLException e1) {
e1.printStackTrace();
return;
   }
   try (PreparedStatement stmt = con.prepareStatement(insert)) {
while (batchCount-- > 0) {
   for (int i = batchSize; i > 0; i--) {
setBatchInput(stmt);
stmt.addBatch();
   }
   System.out.format("%s - submitting batch ...%n", name);
   stmt.executeBatch();
   if (!con.getAutoCommit()) {
con.commit();
   }
}
   } catch (Exception e) {
e.printStackTrace();
   } finally {
if (con != null) {
   try (Statement stmt = con.createStatement()) {
synchronized (problemHere) {
   System.out.format("%d deleted - %s%n", stmt.executeUpdate(drop), name);
   if (!con.getAutoCommit()) {
con.commit();
   }
}
   } catch (SQLException e1) {
e1.printStackTrace();
   }
   try {
con.close();
con = null;
   } catch (SQLException e) {
e.printStackTrace();
   }
}
   }
   System.out.format("%s finished.", name);
}

private static void setBatchInput(PreparedStatement stmt) throws SQLException {
   for (int i = 1; i <= fieldCount; i++) {
stmt.setObject(i, getRandomFieldValue(colType[i - 1]));
   }
}

private static Object getRandomFieldValue(int type) {
   switch (type) {
   case Types.DECIMAL:
return 0;
   case Types.VARCHAR:
return "null";
   case Types.TIMESTAMP:
return new Timestamp(System.currentTimeMillis());
   }
   return null;
}
    }

    public static void main(String[] args) {
int num = 2;
List<Thread> ts = new ArrayList<>();
while (num-- > 0) {
   Thread t = null;
   try {
(t = new Thread(new TestRunner("runner_" + num, 10000, 1))).start();
   } catch (SQLException e) {
e.printStackTrace();
   }
   ts.add(t);
}
for (Thread t : ts) {
   try {
t.join();
   } catch (InterruptedException e) {
e.printStackTrace();
   }
}
    }

}

--
Thanks and Regards
Siddharth Tyagi




--
Thanks and Regards
Siddharth Tyagi