Queries

A distributed query is a SQL query on a partitioned table. A distributed query looks the same as a regular query. It would help users to write optimal distributed queries if they understand how these queries are executed. The system first determines all relevant partitions based on where clause; then it rewrites the query and sends the new query to the sites nodes where the relevant partitions reside; finally it merges the results from all relevant partitions.

Partition Pruning

The execution of most distributed queries does not need all partitions of a DFS table. It could save a significant amount of time if the system can just load the partitions relevant to the query.

DolphinDB performs partition pruning in the following situations:

  • In a DFS table is partitioned by VALUE, RANGE or LIST, if a where condition:

    • only includes the partitioning column (not used in a calculation or function) of the DFS table, relational operators (<, <=, =, ==, >, >=, in, between), logical operators (or, and), and constants (including operations between constants);

    • uses non-chain conditions (such as 100<x<200);

    • narrows down the relevant partitions. (See the following examples)

  • In a DFS table is partitioned by HASH, if a where condition includes a partitioning column (not used in a calculation or function) of the DFS table, relational operators (<, <=, =, ==, >, >=, in, between), logical operators (or, and), and constants (including operations between constants). Note that when a partitioning column is of STRING type and the between operator is used, partition pruning cannot be implemented.

n=10000000
id=take(1..1000, n).sort()
date=1989.12.31+take(1..365, n)
announcementDate = date+rand(5, n)
x=rand(1.0, n)
y=rand(10, n)
t=table(id, date, announcementDate, x, y)
db=database("dfs://rangedb1", RANGE, [1990.01.01, 1990.03.01, 1990.05.01, 1990.07.01, 1990.09.01, 1990.11.01, 1991.01.01])
pt = db.createPartitionedTable(t, `pt, `date)
pt.append!(t);

The where conditions specified in the following SQL queries can narrow down relevant partitions:

x = select max(x) from pt where date>1990.12.01-10;

Only 1 partition [1990.11.01, 1991.01.01) is relevant to this query.

select max(x) from pt where date between 1990.08.01:1990.12.01 group by date;

Only 3 partitions ([1990.07.01, 1990.09.01), [1990.09.01, 1990.11.01) and [1990.11.01, 1991.01.01)) are relevant to this query.

select max(x) from pt where y<5 and date between 1990.08.01:1990.08.31;

Only 1 partition [1990.07.01, 1990.09.01) is relevant to this query. Note that the filtering condition y<5 is ignored in the partition pruning. The system will further filter the data by condition y<5 after relevant partitions are loaded.

Partitions can be narrowed down when applying lower-precision functions to partitioning columns of temporal types. The temporal types are sorted in descending order of precision:

  • NANOTIMESTAMP > TIMESTAMP > DATETIME> DATEHOUR> DATE> MONTH> YEAR

  • TIME> SECOND > MINUTE> HOUR

The partitions in the above example are of DATE type, so partition pruning can be achieved if function month or year is used:

select max(x) from pt where month(date)>=2019.12M;

Only 1 partition [1990.11.01, 1991.01.01) is relevant to this query.

The following queries cannot narrow down the relevant partitions. If used on a huge partitioned table, they will take a long time to finish. For this reason these queries should be avoided.

select max(x) from pt where date+30>2019.12.01;
//the partitioning column is involved in a calculation: cannot narrow down relevant partitions

select max(x) from pt where 2019.12.01<date<2019.12.31;
//chained comparision: cannot narrow down relevant partitions

select max(x) from pt where y<5;
// no partitioning column is used: cannot narrow down relevant partitions

select max(x) from pt where date<announcementDate-3;
// the filtering logic cannot narrow down relevant partitions

select max(x) from pt where y<5 or date between 1990.08.01:1990.08.31;
// the filtering column is compared to another column: cannot narrow down relevant partitions

Query optimization

While SQL operations are executed in partitioned table, they will be executed in each partition parallelly. In scenario of partition pruning, the data in some partitions is included in the range of the query's conditions. It makes no sense to do such query. DolphinDB supports to delete such meaningless querying conditions to optimize the query performance.

select max(x) from pt where date between 1990.08.21:1990.12.25;

Since the database is partitioned by every tow month, filtering triggered by "between" statement is only executed on two partitions "1990.07.01-08.31" and "1990.11.01-12.31". On partition "1990.07.01-08.31" the "filtering" operation can be deleted.

Implementation of Distributed Queries with MapReduce

There are two situations when the system has to rewrite a distributed query. One is when we use the order by clause. The other situation is when we use aggregate functions and a grouping column is not the partitioning column.

When the partitioning column is the first grouping column, the implementation is straightforward. The system simply executes the query on each relevant partition and then merge the individual query results.

When the partitioning column is not the first grouping column, the system uses the MapReduce method to implement the distributed query execution. It first searches the MapReduce definition for aggregate functions, then rewrites the original query to the map query based on the mapr definition of the aggregate functions and sends the map query to each involved partition for execution, and finally it executes the reduce query to merge the results.

select avg(x) from t where id>200 and id<900 group by date;
// the partition column of table t is column id rather than column date

For the example above, the map query would conduct the following operation:

tempTable = select sum(x) as col1, count(x) as col2 from t where id>200 and id<900 group by date;

The reduce query would conduct the following operation:

select wavg(col1, col2) as avg_x from tempTable group by date;

Not all distributed queries can be rewritten this way. One example is the calculation of the median over a distributed table. We will discuss this in the section of Distributed Computing.

The system knows if a built-in function is an aggregate function or not. However, when users define their own aggregate functions, they must use keyword defg rather than def to tell the system it is an aggregate function. If we define an aggregate function with the keyword def, and apply the function to a distributed query, we may get wrong results or receive exceptions.

DolphinDB allows user-defined functions (UDF) or user-defined aggregate functions (UDAF) in SQL queries. Users can simply define a function and then use it in the query on the fly. There is no need for compile or deployment. The implementation in a distributed query, however, is slightly different from the implementation in an ordinary query. The system automatically checks the existence of UDF or UDAF for distributed queries. If the system detects an UDF or UDAF, it will serialize it and its dependent UDF or UDAF to the remote sites together with the query. This complicated checking and serialization process is invisible to users. It is one of the unique features DolphinDB offers in comparison with other systems.

DolphinDB does not support the use of aggregate functions such as sum or count in the where clause of distributed queries. This is because distributed queries use the where clause to select the relevant partitions before executing aggregate functions with the MapReduce method. If aggregate functions appear in the where clause, distributed queries cannot select the relevant partitions and would fail. If we need to use aggregate functions in the where clause of distributed queries, we can write new distributed queries to calculate the values of these aggregate functions and assign these values to some variables, and quote these variables in the original distributed queries.

DolphinDB SQL statement supports the use of variables that are not defined in SQL queries. For distributed SQL queries, the system automatically copies the variables on the local node to the required remote node. This is an advantage of DolphinDB compared to other systems.

Please referce to index for more SQL information.