Distributed Query Processing

Support for distributed query processing adopts a straightforward master/worker architecture according to the following general concepts:

  • Users can transparently query data that is spread (either replicated or sharded) over multiple MonetDB instances.

  • Every instance in the cluster is a full-fledged MonetDB server, so they have equal functionality. The system can be set up in such a way that a MonetDB client can establish a connection to any one of those instances and run all supported queries.

  • Distributed queries are queries which involve data on remote MonetDB instances. The instance on which the query was started automatically becomes the master for this particular query. the instances containing remote data addressed by this query are workers.

  • The master is responsible for parsing the query, generating distributed query plan, sending the subqueries to the workers and merging subquery results into final results.

  • Every instance can act as a master and a worker, depending on data placement and where a distributed query has been started.

To identify data distributed over multiple MonetDB instances, we have extended MonetDB with three new types of SQL TABLEs:

  • Remote table: one can create a REMOTE TABLE on one MonetDB server to refer to a table that physically exists on another (i.e. remote) MonetDB server by giving the URL of the remote server and the name of the referred table on the remove server.

  • Replica table: one can create a REPLICA TABLE and add both local tables (i.e. normal SQL tables that were created on the same MonetDB server as this REPLICA TABLE) and remote tables (i.e. the aforementioned REMOTE TABLEs) to it. The keyword REPLICA indicates that all members are an exact replica of each other and the system can use whichever member is most convenient.

    For performance reasons, MonetDB does not check if the replica tables are indeed identical. This responsibility is left to the database users.

  • Merge table: one can create a MERGE TABLE and add both local and remote tables to it to form a single big table. The result behaves as a UNION ALL of all partition tables in the MERGE TABLE and the system will potentially have to access all members to answer a query. More details about merge tables can be found in Data Partitioning.

With these extensions, users now have different ways to query data on a remote server: directly on the remote server as a normal table, or through the local server as a REMOTE TABLE or as part of a REPLICA TABLE/MERGE TABLE.

Queries involving REMOTE TABLEs are automatically split into subqueries by the master and executed on remote workers. The combination of MERGE, REPLICA and REMOTE tables enables fine control of how to distribute data and query workloads to maximally benefit from the available CPU and RAM resources.

Examples

Here we show several simple examples of how distributed queries on REPLICA and MERGE tables containing both normal local and REMOTE tables work.

Step 1. Set up a small cluster of three MonetDB servers mdb1, mdb2 and mdb3 running on ports 60001, 60002 and 60003, respectively.

$ mserver5 --dbpath=/tmp/mdb1 --set mapi_port=60001 --set monet_daemon=yes &
$ mserver5 --dbpath=/tmp/mdb2 --set mapi_port=60002 --set monet_daemon=yes &
$ mserver5 --dbpath=/tmp/mdb3 --set mapi_port=60003 --set monet_daemon=yes &

Step 2. connect to the MonetDB servers to create and populate two simple tables on each of them. The tables s1, s2 and s3 will be used in a REPLICA TABLE, so we insert identical data into each of them. The tables t1, t2 and t3 will be used in a MERGE TABLE, so we insert different values into them:

$ mclient -d mdb1 -u monetdb -p 60001
...
sql>CREATE TABLE s1 (i INT);
operation successful
sql>INSERT INTO s1 VALUES (23), (42);
2 affected rows
sql>CREATE TABLE t1 (s VARCHAR(10));
operation successful
sql>INSERT INTO t1 VALUES ('abc'), ('efg');
2 affected rows


$ mclient -d mdb2 -u monetdb -p 60002
...
sql>CREATE TABLE s2 (i INT);
operation successful
sql>INSERT INTO s2 VALUES(23), (42);
2 affected rows
sql>CREATE TABLE t2 (s varchar(10));
operation successful
sql>INSERT INTO t2 VALUES ('foo'), ('bar');
2 affected rows

$ mclient -d mdb3 -u monetdb -p 60003
...
sql>CREATE TABLE s3 (i INT);
operation successful
sql>INSERT INTO s3 VALUES (23), (42);
2 affected rows
sql>create table t3 (s varchar(10));
operation successful
sql>insert into t3 values ('baz'), ('qux');
2 affected rows

Example 1: create and query REMOTE TABLEs

We choose mdb3 as the master, and mdb1 and mdb2 as the workers. So we continue using the connection to mdb3, and create remote tables there to refer to the tables we have created earlier on mdb1 and mdb2.

Important things to know about REMOTE TABLEs:

  • The format of the URL of a REMOTE TABLE is: mapi:monetdb://<host>:<port>/<dbname>, where all three parameters (i.e. host, port and dbname) are compulsory.

  • The declaration of a REMOTE TABLE must match exactly the signature of its counterpart in the remote database, i.e., the same table name, the same columns names and the same column data types.

  • The remote database server is not contacted at creation time to verify the existence of the table. When a CREATE REMOTE TABLE report “operation successful”, it merely means that the information about the new remote table has been added to the local SQL catalogue. The check at the remote database server is delayed until the first actual query on the remote table.

sql>CREATE REMOTE TABLE s1 (i int) on 'mapi:monetdb://localhost:60001/mdb1';
operation successful
sql>CREATE REMOTE TABLE t1 (s varchar(10)) on 'mapi:monetdb://localhost:60001/mdb1';
operation successful
sql>CREATE REMOTE TABLE s2 (i int) on 'mapi:monetdb://localhost:60002/mdb2';
operation successful
sql>CREATE REMOTE TABLE t2 (s VARCHAR(10)) ON 'mapi:monetdb://localhost:60002/mdb2';
operation successful
>sql>SELECT * FROM s1; -- identical to s2
+------+
| i    |
+======+
|   23 |
|   42 |
+------+
2 tuples

sql>SELECT * FROM t1; -- no overlap with t2
+------+
| s    |
+======+
| abc  |
| efg  |
+------+
2 tuples

sql>SELECT * FROM s2; -- identical to s1
+------+
| i    |
+======+
|   23 |
|   42 |
+------+
2 tuples
sql>SELECT * FROM t2; -- no overlap with t1
+------+
| s    |
+======+
| foo  |
| bar  |
+------+

Example 2: create and query a REPLICA TABLE

Continue using the connection to mdb3, we create a REPLICA TABLE repS, and add s1, s2 and s3 into it. So, repS contains 3 replicas, one local replica and two remote replicas. Selecting from repS returns results as if it was a single table. The logical query plan shows that only one replica is used, i.e., the local replica.

sql>CREATE REPLICA TABLE repS (i INT);
operation successful
sql>ALTER TABLE repS ADD TABLE s2;
operation successful
sql>ALTER TABLE repS ADD TABLE s1;
operation successful
sql>ALTER TABLE repS ADD TABLE s3;
operation successful
sql>SELECT * FROM repS;
+------+
| i    |
+======+
|   23 |
|   42 |
+------+
2 tuples
sql>PLAN SELECT * FROM repS;
+-----------------------------------------------------------------------------------------+
| rel                                                                                     |
+=========================================================================================+
| project (                                                                               |
| | table("sys"."s3") [ "s3"."i" UNIQUE MIN "23" MAX "42" NUNIQUES 2.000000 as "reps"."i",|
| | 		      	"s3"."%TID%" NOT NULL UNIQUE as "reps"."%TID%" ] COUNT 2	  |
| ) [ "reps"."i" UNIQUE NUNIQUES 2.000000 MIN "23" MAX "42" ] COUNT 2                     |
+-----------------------------------------------------------------------------------------+

Example 3: create and query a MERGE TABLE

Continue using the connection to mdb3, we create a MERGE TABLE mrgT, and add t1, t2 and t3 into it. So, mrgT contains 3 partitions, one local partition and two remote partitions. Selecting from mrgT returns a union of all data in its partition tables without duplicate elimination. The logical query plan shows that the WHERE condition is properly pushed down to the workers.

sql>CREATE MERGE TABLE mrgT (s VARCHAR(10));
operation successful
sql>ALTER TABLE mrgT ADD TABLE t2;
operation successful
sql>ALTER TABLE mrgT ADD TABLE t1;
operation successful
sql>ALTER TABLE mrgT ADD TABLE t3;
operation successful
sql>SELECT * FROM mrgT WHERE s <> 'bla';
+------+
| s    |
+======+
| foo  |
| bar  |
| abc  |
| efg  |
| baz  |
| qux  |
+------+
6 tuples
sql>PLAN SELECT * FROM mrgT WHERE s <> 'bla';
+-----------------------------------------------------------------------------------+
| rel                                                                               |
+===================================================================================+
| union (                                                                           |
| | union (                                                                         |
| | | table                                                                         |
| | | | project (                                                                   |
| | | | | select (                                                                  |
| | | | | | REMOTE(sys.t2) [ "t2"."s" as "mrgt"."s" ] COUNT                         |
| | | | | ) [ "mrgt"."s" != varchar(10) "bla" ]                                     |
| | | | ) [ "mrgt"."s" ] REMOTE mapi:monetdb://localhost:60002/mdb2 [ "mrgt"."s" ], |
| | | table                                                                         |
| | | | project (                                                                   |
| | | | | select (                                                                  |
| | | | | | REMOTE(sys.t1) [ "t1"."s" as "mrgt"."s" ] COUNT                         |
| | | | | ) [ "mrgt"."s" != varchar(10) "bla" ]                                     |
| | | | ) [ "mrgt"."s" ] REMOTE mapi:monetdb://localhost:60001/mdb1 [ "mrgt"."s" ]  |
| | ) [ "mrgt"."s" ],                                                               |
| | project (                                                                       |
| | | select (                                                                      |
| | | | table(sys.t3) [ "t3"."s" as "mrgt"."s" ] COUNT                              |
| | | ) [ "mrgt"."s" != varchar(10) "bla" ]                                         |
| | ) [ "mrgt"."s" ]                                                                |
| ) [ "mrgt"."s" ]                                                                  |
+-----------------------------------------------------------------------------------+
21 tuples

Example 4: join a MERGE TABLE with a REPLICA TABLE

Continue using the connection to mdb3, we conduct a join between repS and mrgT. It returns a cross product of one set of values in repS and all values in mrgT.

sql>SELECT * FROM repS, mrgT;
+------+------+
| i    | s    |
+======+======+
|   23 | foo  |
|   23 | bar  |
|   23 | abc  |
|   23 | efg  |
|   42 | foo  |
|   42 | bar  |
|   42 | abc  |
|   42 | efg  |
|   23 | baz  |
|   23 | qux  |
|   42 | baz  |
|   42 | qux  |
+------+------+
12 tuples
sql>plan select * from repS, mrgT;
+--------------------------------------------------------------------------------------------------------------------+
| rel                                                                                                                |
+====================================================================================================================+
| union (                                                                                                            |
| | project (                                                                                                        |
| | | crossproduct (                                                                                                 |
| | | | table("sys"."s3") [ "s3"."i" UNIQUE MIN "23" MAX "42" NUNIQUES 2.000000 as "reps"."i",			     |
| | | | 		    "s3"."%TID%" NOT NULL UNIQUE as "reps"."%TID%" ] COUNT 2,				     |
| | | | union (                                                                                                      |
| | | | | table (                                                                                                    |
| | | | | | REMOTE("sys"."t2") [ "t2"."s" as "mrgt"."s" ] REMOTE sys.t2                                              |
| | | | | ) [ "mrgt"."s" ],                                                                                          |
| | | | | table (                                                                                                    |
| | | | | | REMOTE("sys"."t1") [ "t1"."s" as "mrgt"."s" ] REMOTE sys.t1                                              |
| | | | | ) [ "mrgt"."s" ]                                                                                           |
| | | | ) [ "mrgt"."s" ]                                                                                             |
| | | ) [  ]                                                                                                         |
| | ) [ "reps"."i" NUNIQUES 2.000000 MIN "23" MAX "42", "mrgt"."s" ],                                                |
| | project (                                                                                                        |
| | | crossproduct (                                                                                                 |
| | | | table("sys"."s3") [ "s3"."i" UNIQUE MIN "23" MAX "42" NUNIQUES 2.000000 as "reps"."i",			     |
| | | |			    "s3"."%TID%" NOT NULL UNIQUE as "reps"."%TID%" ] COUNT 2,				     |
| | | | table("sys"."t3") [ "t3"."s" UNIQUE MIN "baz" MAX "qux" NUNIQUES 2.000000 as "mrgt"."s" ] COUNT 2            |
| | | ) [  ] COUNT 4                                                                                                 |
| | ) [ "reps"."i" NUNIQUES 2.000000 MIN "23" MAX "42", "mrgt"."s" NUNIQUES 2.000000 MIN "baz" MAX "qux" ] COUNT 4   |
| ) [ "reps"."i" MIN "23" MAX "42", "mrgt"."s" ]                                                                     |
+--------------------------------------------------------------------------------------------------------------------+