入门示例 01:实时计算买卖价差

计算目标

在量化交易中,因子计算是指根据一些预定义的规则或公式,从原始数据中提取出能够反映市场或个股特征的数值。本例中我们将以行情快照数据作为输入,对每一笔快照数据都响应一次,实时计算并输出买卖价差这一高频因子。

因子计算逻辑

行情数据中包含了买卖双方多档量价信息,买卖价差的定义为卖1价与买1价之差与均价之比:



其中 offerPrice0 、 bidPrice0 分别表示卖1价与买1价。

输入数据示例



动手实现

接下来我们会一步一步地在 DolphinDB 节点上模拟行情快照数据流和完成流式计算输出。完整脚本见附录。

创建作为输入的流数据表

首先创建一张共享流数据表 tick,用于存储和发布行情快照数据:

share(table=streamTable(1:0, `securityID`dateTime`bidPrice0`bidOrderQty0`offerPrice0`offerOrderQty0, [SYMBOL,TIMESTAMP,DOUBLE,LONG,DOUBLE,LONG]), sharedName=`tick)

执行以上语句后,节点内存中会有一张表 tick,暂时没有写入任何一条数据。

创建作为输出的流数据表

创建一张共享流数据表 resultTable,用于存储和发布因子计算结果:

share(table=streamTable(10000:0, ["securityID", "dateTime", "factor"], [SYMBOL, TIMESTAMP, DOUBLE]), sharedName=`resultTable)

执行以上语句后,节点内存中会有一张表 resultTable ,暂时没有写入任何一条数据,我们希望将之后的计算结果实时写入到表 resultTable 中。

订阅流数据表

def factorCalFunc(msg){
  ...
}
subscribeTable(tableName="tick", actionName="factorCal", offset=-1, handler=factorCalFunc, msgAsTable=true, batchSize=1, throttle=0.001)

前三行是一段伪代码,假设系统上定义有名为 factorCalFunc 的一元函数。执行第四行的 subscribeTable 函数,表示订阅流数据表 tick,并指定 factorCalFunc 为数据处理方法,并分配一个后台线程用于不断地处理订阅到的新数据。那么,每当表 tick 被插入一批数据时,数据都会被发布到订阅端,订阅端的后台线程会收到这部分新增的数据,并以此作为输入调用函数 factorCalFunc

  • tableName="tick" 表示订阅流数据表 tick

  • actionName="factorCal" 表示订阅任务的名称,用户自定义即可,同一个节点上 tableName 和 actionName 的组合必须唯一

  • offset=-1 表示从订阅之后表 tick 里新增的第一条数据开始消费

  • handler=factorCalFunc 表示对订阅到的数据的处理方式

  • msgAsTable=true 表示待处理的订阅到的数据是表,也就是这里 factorCalFunc 函数的入参 msg 是表

  • batchSize=1 和 throttle=0.001 共同指定了后台线程处理的频率,本例中的频率为任意有一条或多条待处理数据即调用一次处理函数 factorCalFunc

那么,接下来的问题是如何实现因子计算函数 factorCalFunc,来计算每支股票的买卖价差。

定义计算逻辑

def factorCalFunc(msg){
	tmp = select securityID, dateTime, (offerPrice0-bidPrice0)*2\(offerPrice0+bidPrice0) as factor from msg
	objByName("resultTable").append!(tmp)	
}
  • msg 可以看做一个与 tick 表结构相同的内存表

  • objByName("resultTable").append!(tmp) 表示往结果表 resultTable 插入 tmp

  • tmp 表通过 DolphinDB SQL 计算得到,(offerPrice0-bidPrice0)*2(offerPrice0+bidPrice0) 表示买卖价差公式,对 msg 表中的每一行记录都会计算出对应的买卖价差因子

模拟数据输入

那么,完整的流计算任务定义脚本如下:

// 创建作为输入的流数据表
share(table=streamTable(1:0, `securityID`dateTime`bidPrice0`bidOrderQty0`offerPrice0`offerOrderQty0, [SYMBOL,TIMESTAMP,DOUBLE,LONG,DOUBLE,LONG]), sharedName=`tick)
// 创建作为输出的流数据表
share(table=streamTable(10000:0, ["securityID", "dateTime", "factor"], [SYMBOL, TIMESTAMP, DOUBLE]), sharedName=`resultTable)
go
// 定义处理函数
def factorCalFunc(msg){
	tmp = select securityID, dateTime, (offerPrice0-bidPrice0)*2\(offerPrice0+bidPrice0) as factor from msg
	objByName("resultTable").append!(tmp)	
}
// 订阅流数据表
subscribeTable(tableName="tick", actionName="factorCal", offset=-1, handler=factorCalFunc, msgAsTable=true, batchSize=1, throttle=0.001)

执行上述脚本后,我们提交了对流数据表 tick 的订阅,并指定调用自定义函数 factorCalFunc 来处理收到的订阅数据。接下来,我们往流数据表里注入一些数据来观察订阅消费的效果。

insert into tick values(`000001, 2023.01.01T09:30:00.000, 19.98, 100, 19.99, 120)
insert into tick values(`000001, 2023.01.01T09:30:03.000, 19.96, 130, 19.99, 120)
insert into tick values(`000001, 2023.01.01T09:30:06.000, 19.90, 120, 20.00, 130)

插入若干条数据到表 tick 后,查看结果表 resultTable



至此,我们通过发布订阅框架完成了一个流计算任务的开发,不断地往流数据表 tick 中插入数据将会不断地触发计算并输出因子到结果表。

进行运维

流计算运维

消息处理线程为后台线程,因此,系统提供了流计算运维函数来查看流计算的执行情况。除了使用函数查询外,在 2.00.11及以上版本中也可以在 web 界面的流计算监控模块中直接查看。

查看流订阅消费的状态:

getStreamingStat().subWorkers

执行以上代码后可以看到,节点上目前有一个订阅线程在工作,该订阅已经处理了 3 条消息,目前没有报错。订阅消费队列的最大深度为1000万,目前队列里没有待处理的数据。



流计算环境清理

当流计算任务运行一段时间后,或许我们需要下线计算任务、更改计算逻辑或者修改输入源的表结构,那么我们可能需要将订阅或者流数据表统统取消掉,可以使用以下系统函数。

  • 取消订阅:

    unsubscribeTable(tableName="tick", actionName="factorCal")

    执行以上代码可以取消特定的订阅关系,之后表 tick 再发布数据,系统也不会计算主动成交量因子。

  • 删除流数据表:

    dropStreamTable(`tick)

附录

完整脚本

// 创建作为输入的流数据表
share(table=streamTable(1:0, `securityID`dateTime`bidPrice0`bidOrderQty0`offerPrice0`offerOrderQty0, [SYMBOL,TIMESTAMP,DOUBLE,LONG,DOUBLE,LONG]), sharedName=`tick)
// 创建作为输出的流数据表
share(table=streamTable(10000:0, ["securityID", "dateTime", "factor"], [SYMBOL, TIMESTAMP, DOUBLE]), sharedName=`resultTable)
go
// 定义处理函数
def factorCalFunc(msg){
	tmp = select securityID, dateTime, (offerPrice0-bidPrice0)*2\(offerPrice0+bidPrice0) as factor from msg
	objByName("resultTable").append!(tmp)	
}
// 订阅流数据表
subscribeTable(tableName="tick", actionName="factorCal", offset=-1, handler=factorCalFunc, msgAsTable=true, batchSize=1, throttle=0.001)
go
// 模拟数据输入
insert into tick values(`000001, 2023.01.01T09:30:00.000, 19.98, 100, 19.99, 120)
insert into tick values(`000001, 2023.01.01T09:30:03.000, 19.96, 130, 19.99, 120)
insert into tick values(`000001, 2023.01.01T09:30:06.000, 19.90, 120, 20.00, 130)