查询数据

DolphinDB 支持大多数标准 SQL 语法,我们可以使用 SQL 语句查询数据库中的数据。

db=database("dfs://rangedb")
pt=loadTable(db,`pt)
select count(*) from pt;

得到:

count
1000000

与传统的数据库不同,DolphinDB 是集数据库、编程语言和分布式计算于一体的系统。数据库和表在 DolphinDB 中是一个普通变量,并不存在独立的保留空间。因此,每次访问数据库或表时都要使用 databaseloadTable 函数将它们赋予到变量中。

分布式查询(即查询分布式数据表)和普通查询的语法并无差异。理解分布式查询的工作原理有助于写出高效的查询。系统首先根据 where 子句确定需要的分区,然后重写查询,并把新的查询发送到相关分区所在的位置,最后整合所有分区的结果。

分区剪枝

绝大多数分布式查询只涉及分布式表的部分分区。当数据量较大时,过滤条件的不同写法会造成查询耗时的巨大差异。当查询分布式表时,过滤条件满足以下要求,则可以进行分区剪枝。

  • 当查询采用 VALUE 分区、RANGE 分区或 LIST 分区的分布式表时,若 where 子句中某个过滤条件同时满足以下条件,则系统只加载与查询相关的分区,以节省查询耗时:

    • 仅包含分布式表的原始分区字段、使用关系运算符(<, <=,=, ==, >, >=, in, between)或逻辑运算符(or, and),以及常量(包括常量与常量的运算)
    • 非链式条件(例如100<x<200)
    • 过滤逻辑可以缩窄相关分区范围(下面的例子中有说明)。
  • 当查询采用 HASH 分区的分布式表时,若 where 子句中某个过滤条件满足以下条件,则系统会进行分区剪枝:
    • 包含分布式表的原始分区字段,使用关系运算符(=, ==, in, between)或逻辑运算符(or, and),以及常量(包括常量与常量的运算)。注意:当分区字段是 STRING 类型时,使用 between 运算符不能进行剪枝。

若 where 子句中的过滤条件不满足以上要求,则会遍历所有分区进行查询。

下面的例子可以帮助理解 DolphinDB 如何缩窄相关分区范围。

n=10000000
id=take(1..1000, n).sort()
date=1989.12.31+take(1..365, n)
announcementDate = date+rand(5, n)
x=rand(1.0, n)
y=rand(10, n)
t=table(id, date, announcementDate, x, y)
db=database("dfs://rangedb1", RANGE, [1990.01.01, 1990.03.01, 1990.05.01, 1990.07.01, 1990.09.01, 1990.11.01, 1991.01.01])
pt = db.createPartitionedTable(t, `pt, `date)
pt.append!(t);

以下类型的查询可以在加载和处理数据前缩小分区范围:

x = select max(x) from pt where date>1990.12.01-10;
// 系统确定,只有一个分区与查询相关:[1990.11.01, 1991.01.01)。
select max(x) from pt where date between 1990.08.01:1990.12.01 group by date;
// 系统确定,只有三个分区与查询相关:[1990.07.01, 1990.09.01)、[1990.09.01, 1990.11.01)和[1990.11.01, 1991.01.01)。
select max(x) from pt where y<5 and date between 1990.08.01:1990.08.31;
// 系统确定,只有一个分区与查询相关:[1990.07.01, 1990.09.01)。注意,系统忽略了y<5的条件。加载了相关分区后,系统会根据y<5的条件进一步筛选数据。

对于时间类型的分区列应用时间精度更低的函数,也可以缩小分区范围。时间精度由高到低的排序为:

  • NANOTIMESTAMP > TIMESTAMP > DATETIME> DATEHOUR> DATE> MONTH> YEAR
  • TIME> SECOND > MINUTE> HOUR

上例的时间类型是 DATE,则对其应用 monthyear 函数后,也可以进行分区剪枝,见如下代码:

select max(x) from pt where month(date)>=1990.12M;
// 系统确定,只有一个分区与查询相关:[1990.11.01, 1991.01.01)。

以下查询不能确定相关分区。如果 pt 是数据量非常大的分区表,查询会耗费大量时间,因此应该尽量避免以下写法。

select max(x) from pt where date+30>2019.12.01;
//不可对分区字段进行运算

select max(x) from pt where 2019.12.01<date<2019.12.31;
//不可使用链式比较

select max(x) from pt where y<5;
//至少有一个过滤条件需要使用分区字段

select max(x) from pt where date<announcementDate-3;
//与分区字段比较时仅可使用常量,不可使用其他列

select max(x) from pt where y<5 or date between 1990.08.01:1990.08.31;
//由于必须执行y<5,过滤逻辑无法缩窄相关分区范围

分组子句

group by

GROUP BY 是一种用于将数据进行分组并对每个组应用聚合函数的 SQL 语句。

例如根据成交数据生成5分钟K线,运用 group by 子句以及内置函数 dailyAlignedBar 可以便捷地处理A股市场每天的两个交易时段:

barMinutes = 5
sessionsStart=09:30:00.000 13:00:00.000
OHLC = select first(TradePrice) as open, max(TradePrice) as high, min(TradePrice) as low, last(TradePrice) as close, sum(TradeQty) as volume from trade where date(TradeTime) = 2021.04.26 group by securityid, dailyAlignedBar(TradeTime, sessionsStart, barMinutes*60*1000) as barStart 

context by

context by 是 DolphinDB 的独有功能,是对标准 SQL 语句的拓展。与 group by 类似,都对数据进行分组。但是,用 group by 时,每一组返回一个标量值,而用 context by 时,每一组返回一个和组内元素数量相同的向量。group by 只能配合聚合函数使用,而 context by 既可以配合聚合函数使用,也可以与移动窗口函数或累积函数等其它函数结合使用。

例如计算时间加权订单斜率:

@state
def timeWeightedOrderSlope(bid,bidQty,ask,askQty,lag=20){
	return (log(iif(ask==0,bid,ask))-log(iif(bid==0,ask,bid)))\(log(askQty)-log(bidQty)).ffill().mavg(lag, 1).nullFill(0)
}

// 通过 context by 分组计算
res=select SecurityID,DateTime,timeWeightedOrderSlope(bidPrice[0],bidOrderQty[0],OfferPrice[0],OfferOrderQty[0])  as TimeWeightedOrderSlope
	from  loadTable("dfs://snapshotdb","snapshottb") where date(DateTime)=idate context by SecurityID csort DateTime map

pivot by

pivot by 是 DolphinDB 的独有功能,是对标准 SQL 语句的拓展。它将表中一列或多列的内容按照两个维度重新排列,亦可配合数据转换函数使用。 与 select 子句一起使用时返回一个表,而和 exec 语句一起使用时返回一个矩阵。若重新排列后存在行维度存在多个相同值,则会进行去重,只保留最后一个值。

当因子以窄表模式存储,可使用 pivot by 在查询时将数据转换为面板数据,进而用于量化交易中的程序计算。例如,对一个包含时间戳、股票代码、因子名和因子值四列的窄表,可使用以下查询转化为面板数据:

t = select factorValue from loadTable("dfs://factordb","factortb") where tradetime >=2023.12.01 and tradetime <= 2023.12.31 and factorname = specificFactorName  pivot by tradetime,securityid,factorname

连接分布式表

连接分布式表时,若不使用 select, where 等过滤语句,返回的表会包含过多冗余数据。因此,DolphinDB 只能在 SQL 语句中进行分布式表连接。有关分布式连接支持的连接方式,参考: 表连接 及其下属连接方式页面。

分布式表连接时,需要遵循以下规则:

  1. 分布式表 asof join 或 window join 时,必须提供至少两个连接列。
  2. 当分布式表与维度表或内存表连接时,系统会将维度表或内存表复制到分布式表所在的各个节点上执行连接操作。如果本地表数据量非常庞大,表的传送将非常耗时。为了提高性能,系统在数据复制之前用 where 条件尽可能多地过滤内存表。如果右表数据量太大,会影响查询速度,所以在实际应用中,右表的数据量最好比较小。

以下为连接分布式表的示例:

创建数据库的脚本如下:

dates=2019.01.01..2019.01.31
syms="A"+string(1..30)
sym_range=cutPoints(syms,3)
db1=database("",VALUE,dates)
db2=database("",RANGE,sym_range)
db=database("dfs://stock",COMPO,[db1,db2])
n=10000
datetimes=2019.01.01T00:00:00..2019.01.31T23:59:59
t=table(take(datetimes,n) as trade_time,take(syms,n) as sym,rand(1000,n) as qty,rand(500.0,n) as price)
trades=db.createPartitionedTable(t,`trades,`trade_time`sym).append!(t)

n=200
t2=table(take(datetimes,n) as trade_time,take(syms,n) as sym,rand(500.0,n) as bid,rand(500.0,n) as offer)
quotes=db.createPartitionedTable(t2,`quotes,`trade_time`sym).append!(t2)

t3=table(syms as sym,take(0 1,30) as type)
infos=db.createTable(t3,`infos).append!(t3)

例1. 通过以下语句连接两个分布式表 trades 和 quotes:

select * from ej(trades,quotes,`trade_time`sym);

得到:

trade_time sym qty price bid offer
2019.01.01T00:00:00 A1 39 7.366735 37.933525 446.917644
2019.01.01T00:00:09 A10 15 461.381014 405.092702 26.659516
2019.01.01T00:00:10 A11 987 429.981704 404.289413 347.64917
2019.01.01T00:00:11 A12 266 60.466206 420.426175 83.538043
2019.01.01T00:00:12 A13 909 362.057769 324.886047 162.502655
2019.01.01T00:00:13 A14 264 113.964472 497.598722 103.114702
2019.01.01T00:00:14 A15 460 347.518325 24.584629 357.854207
2019.01.01T00:00:15 A16 196 258.889177 49.467399 13.974672
2019.01.01T00:00:16 A17 198 403.564922 428.539984 208.410852
2019.01.01T00:00:17 A18 30 288.469046 41.905556 378.080141
...

例2. 通过以下语句连接分布式表和维度表:

select * from lj(trades,infos,`sym);

得到:

trade_time sym qty price type
2019.01.01T00:00:00 A1 856 359.809918 0
2019.01.01T00:00:09 A10 368 305.801702 1
2019.01.01T00:00:10 A11 549 447.406744 0
2019.01.01T00:00:11 A12 817 115.613373 1
2019.01.01T00:00:12 A13 321 298.317481 0
2019.01.01T00:00:13 A14 3 2.289171 1
2019.01.01T00:00:14 A15 586 91.841629 0
2019.01.01T00:00:15 A16 745 43.256142 1
2019.01.01T00:00:16 A17 60 0.153205 0
...

例3. 通过以下语句连接分布式表和内存表:

tmp=table("A"+string(1..15) as sym,2019.01.11..2019.01.25 as date);
select * from ej(trades,tmp,`sym);

得到:

trade_time sym qty price date
2019.01.01T00:00:00 A1 856 359.809918 2019.01.11
2019.01.01T00:00:09 A10 368 305.801702 2019.01.20
2019.01.01T00:00:10 A11 549 447.406744 2019.01.21
2019.01.01T00:00:11 A12 817 115.613373 2019.01.22
2019.01.01T00:00:12 A13 321 298.317481 2019.01.23
2019.01.01T00:00:13 A14 3 2.289171 2019.01.24
2019.01.01T00:00:14 A15 586 91.841629 2019.01.25
2019.01.01T00:00:30 A1 390 325.407485 2019.01.11
...

查询优化

DolphinDB 在分布式表上执行 SQL 操作时,会在每个分区中并发执行 SQL 语句。在分区剪枝场景下,某些分区内的数据完全涵盖在过滤条件筛选的数据范围内;此时在该分区内执行这条过滤语句没有意义,且增加了分区扫描的开销。DolphinDB 支持在分区的子查询上删除无意义的过滤条件,从而达到查询性能的优化。

select max(x) from pt where date between 1990.08.21:1990.12.25;

由于该数据库按照两个月为一个范围进行分区,因此只需要在1990.07.01-08.31分区和1990.11.01-12.31两个分区内执行该 select 语句中的 between 语句筛选符合条件的数据,1990.09.01-10.31这个分区的子查询内可以删除 between 的筛选操作。

最佳实践

  • 查询时,尽量将查询结果赋值给一个变量,即用 data = select * from pt 替代 select * from pt,二者区别在于:
    • 前者会将数据从分布式表查询到内存。
    • 后者会将数据查出后返回给客户端。如果数据量很大,传输中可能会造成服务端网络拥塞,且会感觉查询很慢,这是由于大量时间耗费在网络传输过程中。
  • 查询时,应尽可能使用 WHERE 子句对分区字段进行过滤。如果不指定,会查询全表数据,在数据量很大时,CPU、内存、磁盘IO的资源都会迅速耗尽。
  • 如果库表使用的是 TSDB 存储引擎,在 WHERE 子句中指定分区字段后,进一步指定 sortKey 可以提高查询效率。
  • 一次查询的数据数量上限是 20 亿,如需查询超过 20 亿条的数据,要分批查询。
  • 一次查询涉及的分区数不能太多,默认是65536。如果遇到涉及分区数超过此限制的情况,可以通过修改配置参数 maxPartitionNumPerQuery 的值放宽限制,也可以通过增加过滤条件减小涉及的分区数,通常更建议第二种措施。

count 在 SQL 中三种常见的用法:

  • select count(*) from pt:返回 pt 中的数据条数。
  • select count(1) from pt:若 pt 为空表,则返回1;否则返回 pt 表中数据涉及到的分区数量。
  • select count(colName) from pt 返回列 colName 中的非空数据的个数。