入门示例 01:实时计算买卖价差
计算目标
在量化交易中,因子计算是指根据一些预定义的规则或公式,从原始数据中提取出能够反映市场或个股特征的数值。本例中我们将以行情快照数据作为输入,对每一笔快照数据都响应一次,实时计算并输出买卖价差这一高频因子。
因子计算逻辑
行情数据中包含了买卖双方多档量价信息,买卖价差的定义为卖1价与买1价之差与均价之比:
其中 offerPrice0 、 bidPrice0 分别表示卖1价与买1价。
输入数据示例
动手实现
接下来我们会一步一步地在 DolphinDB 节点上模拟行情快照数据流和完成流式计算输出。完整脚本见附录。
创建作为输入的流数据表
首先创建一张共享流数据表 tick,用于存储和发布行情快照数据:
share(table=streamTable(1:0, `securityID`dateTime`bidPrice0`bidOrderQty0`offerPrice0`offerOrderQty0, [SYMBOL,TIMESTAMP,DOUBLE,LONG,DOUBLE,LONG]), sharedName=`tick)
执行以上语句后,节点内存中会有一张表 tick,暂时没有写入任何一条数据。
创建作为输出的流数据表
创建一张共享流数据表 resultTable,用于存储和发布因子计算结果:
share(table=streamTable(10000:0, ["securityID", "dateTime", "factor"], [SYMBOL, TIMESTAMP, DOUBLE]), sharedName=`resultTable)
执行以上语句后,节点内存中会有一张表 resultTable ,暂时没有写入任何一条数据,我们希望将之后的计算结果实时写入到表 resultTable 中。
订阅流数据表
def factorCalFunc(msg){ ... } subscribeTable(tableName="tick", actionName="factorCal", offset=-1, handler=factorCalFunc, msgAsTable=true, batchSize=1, throttle=0.001)
前三行是一段伪代码,假设系统上定义有名为 factorCalFunc
的一元函数。执行第四行的
subscribeTable
函数,表示订阅流数据表 tick,并指定
factorCalFunc
为数据处理方法,并分配一个后台线程用于不断地处理订阅到的新数据。那么,每当表
tick 被插入一批数据时,数据都会被发布到订阅端,订阅端的后台线程会收到这部分新增的数据,并以此作为输入调用函数
factorCalFunc
。
-
tableName="tick" 表示订阅流数据表 tick
-
actionName="factorCal" 表示订阅任务的名称,用户自定义即可,同一个节点上 tableName 和 actionName 的组合必须唯一
-
offset=-1 表示从订阅之后表 tick 里新增的第一条数据开始消费
-
handler=factorCalFunc 表示对订阅到的数据的处理方式
-
msgAsTable=true 表示待处理的订阅到的数据是表,也就是这里
factorCalFunc
函数的入参 msg 是表 -
batchSize=1 和 throttle=0.001 共同指定了后台线程处理的频率,本例中的频率为任意有一条或多条待处理数据即调用一次处理函数
factorCalFunc
那么,接下来的问题是如何实现因子计算函数 factorCalFunc
,来计算每支股票的买卖价差。
定义计算逻辑
def factorCalFunc(msg){ tmp = select securityID, dateTime, (offerPrice0-bidPrice0)*2\(offerPrice0+bidPrice0) as factor from msg objByName("resultTable").append!(tmp) }
-
msg 可以看做一个与 tick 表结构相同的内存表
-
objByName("resultTable").append!(tmp) 表示往结果表 resultTable 插入 tmp 表
-
tmp 表通过 DolphinDB SQL 计算得到,(offerPrice0-bidPrice0)*2(offerPrice0+bidPrice0) 表示买卖价差公式,对 msg 表中的每一行记录都会计算出对应的买卖价差因子
模拟数据输入
那么,完整的流计算任务定义脚本如下:
// 创建作为输入的流数据表 share(table=streamTable(1:0, `securityID`dateTime`bidPrice0`bidOrderQty0`offerPrice0`offerOrderQty0, [SYMBOL,TIMESTAMP,DOUBLE,LONG,DOUBLE,LONG]), sharedName=`tick) // 创建作为输出的流数据表 share(table=streamTable(10000:0, ["securityID", "dateTime", "factor"], [SYMBOL, TIMESTAMP, DOUBLE]), sharedName=`resultTable) go // 定义处理函数 def factorCalFunc(msg){ tmp = select securityID, dateTime, (offerPrice0-bidPrice0)*2\(offerPrice0+bidPrice0) as factor from msg objByName("resultTable").append!(tmp) } // 订阅流数据表 subscribeTable(tableName="tick", actionName="factorCal", offset=-1, handler=factorCalFunc, msgAsTable=true, batchSize=1, throttle=0.001)
执行上述脚本后,我们提交了对流数据表 tick 的订阅,并指定调用自定义函数 factorCalFunc
来处理收到的订阅数据。接下来,我们往流数据表里注入一些数据来观察订阅消费的效果。
insert into tick values(`000001, 2023.01.01T09:30:00.000, 19.98, 100, 19.99, 120) insert into tick values(`000001, 2023.01.01T09:30:03.000, 19.96, 130, 19.99, 120) insert into tick values(`000001, 2023.01.01T09:30:06.000, 19.90, 120, 20.00, 130)
插入若干条数据到表 tick 后,查看结果表 resultTable:
至此,我们通过发布订阅框架完成了一个流计算任务的开发,不断地往流数据表 tick 中插入数据将会不断地触发计算并输出因子到结果表。
进行运维
流计算运维
消息处理线程为后台线程,因此,系统提供了流计算运维函数来查看流计算的执行情况。除了使用函数查询外,在 2.00.11及以上版本中也可以在 web 界面的流计算监控模块中直接查看。
查看流订阅消费的状态:
getStreamingStat().subWorkers
执行以上代码后可以看到,节点上目前有一个订阅线程在工作,该订阅已经处理了 3 条消息,目前没有报错。订阅消费队列的最大深度为1000万,目前队列里没有待处理的数据。
流计算环境清理
当流计算任务运行一段时间后,或许我们需要下线计算任务、更改计算逻辑或者修改输入源的表结构,那么我们可能需要将订阅或者流数据表统统取消掉,可以使用以下系统函数。
-
取消订阅:
unsubscribeTable(tableName="tick", actionName="factorCal")
执行以上代码可以取消特定的订阅关系,之后表 tick 再发布数据,系统也不会计算主动成交量因子。
-
删除流数据表:
dropStreamTable(`tick)
附录
完整脚本
// 创建作为输入的流数据表 share(table=streamTable(1:0, `securityID`dateTime`bidPrice0`bidOrderQty0`offerPrice0`offerOrderQty0, [SYMBOL,TIMESTAMP,DOUBLE,LONG,DOUBLE,LONG]), sharedName=`tick) // 创建作为输出的流数据表 share(table=streamTable(10000:0, ["securityID", "dateTime", "factor"], [SYMBOL, TIMESTAMP, DOUBLE]), sharedName=`resultTable) go // 定义处理函数 def factorCalFunc(msg){ tmp = select securityID, dateTime, (offerPrice0-bidPrice0)*2\(offerPrice0+bidPrice0) as factor from msg objByName("resultTable").append!(tmp) } // 订阅流数据表 subscribeTable(tableName="tick", actionName="factorCal", offset=-1, handler=factorCalFunc, msgAsTable=true, batchSize=1, throttle=0.001) go // 模拟数据输入 insert into tick values(`000001, 2023.01.01T09:30:00.000, 19.98, 100, 19.99, 120) insert into tick values(`000001, 2023.01.01T09:30:03.000, 19.96, 130, 19.99, 120) insert into tick values(`000001, 2023.01.01T09:30:06.000, 19.90, 120, 20.00, 130)