股票行情回放
一个量化策略在生产(交易)环境中运行时,实时数据的处理通常是由事件驱动的。为确保研发和生产使用同一套代码,通常在研发阶段需将历史数据,严格按照事件发生的时间顺序进行回放,以此模拟交易环境。一个交易所的行情数据通常包括逐笔委托、逐笔成交、快照等多种类型的数据。DolphinDB 提供了严格按照时间顺序将多个不同数据源同时进行回放的功能。
首先简要介绍 DolphinDB 数据回放功能和原理,之后介绍股票行情回放的应用方案和代码实现,最后给出搭建行情回放服务的最佳实践。
1. 单表回放
DolphinDB 历史数据回放功能通过 replay 函数实现。replay 函数的作用是将内存表或数据库表中的记录以一定的速率写入到目标表中,以模拟实时注入的数据流。根据输入表的数量,回放分为单表回放和多表回放,其中单表回放是最基础的回放模式,即将一个输入表回放至一个相同表结构的目标表中,以下是不包括建表语句的单表回放示例:
tradeDS = replayDS(sqlObj=<select * from loadTable("dfs://trade", "trade") where Date = 2020.12.31>, dateColumn=`Date, timeColumn=`Time) replay(inputTables=tradeDS, outputTables=tradeStream, dateColumn=`Date, timeColumn=`Time, replayRate=10000, absoluteRate=true)
以上脚本将数据库 "dfs://trade" 中的 "trade" 表中 2020 年 12 月 31 日的数据以每秒 1 万条的速度注入目标表 tradeStream 中。更多关于 replay、replayDS 函数的介绍可以参考 DolphinDB 历史数据回放教程。
但是,单表回放并不能满足所有的回放要求。因为在实践中,一个领域问题往往需要多个不同类型的消息协作,例如金融领域的行情数据包括逐笔委托、逐笔成交、快照等,为了更好地模拟实际交易中的实时数据流,通常需要将以上三类数据同时进行回放,这时便提出了多表回放的需求。
2. 多表回放
DolphinDB不断优化拓展多表回放的功能,既支持N个输入表一对一回放至N个输出表的N对N回放,又在1.30.17/2.00.5及之后的版本中支持了N个不同结构的输入表同时回放至同一个输出表的N对一异构回放,异构回放能够保证多个数据源的严格时序回放和消费。本小节将对多表回放面临的难点、相应的 DolphinDB 技术解决方案和原理展开介绍。
2.1. N 对 N回放
类似单表回放的原理,replay 函数提供了N 对 N模式的多表回放,即将多个输入表回放至多个目标表,输入表与目标表一一对应。以下是N 对 N模式的多表回放的示例:
orderDS = replayDS(sqlObj=<select * from loadTable("dfs://order", "order") where Date = 2020.12.31>, dateColumn=`Date, timeColumn=`Time) tradeDS = replayDS(sqlObj=<select * from loadTable("dfs://trade", "trade") where Date = 2020.12.31>, dateColumn=`Date, timeColumn=`Time) snapshotDS = replayDS(sqlObj=<select * from loadTable("dfs://snapshot", "snapshot") where Date =2020.12.31>, dateColumn=`Date, timeColumn=`Time) replay(inputTables=[orderDS, tradeDS, snapshotDS], outputTables=[orderStream, tradeStream, snapshotStream], dateColumn=`Date, timeColumn=`Time, replayRate=10000, absoluteRate=true)
以上脚本将三个数据库表中的历史数据分别注入三个目标表中。在N 对 N的模式中,不同表的在同一秒内的两条数据写入目标表的顺序可能和数据中的时间字段的先后关系不一致。此外,下游如果由三个处理线程分别对三个目标表进行订阅与消费,也很难保证表与表之间的数据被处理的顺序关系。因此,N 对 N回放不能保证整体上最严格的时序。
在实践中,一个领域中不同类型的消息是有先后顺序的,比如股票的逐笔成交和逐笔委托,所以在对多个数据源回放时要求每条数据都严格按照时间顺序注入目标表,为此我们需要解决以下问题:
- 不同结构的数据如何统一进行排序和注入以保证整体的顺序?
- 如何保证对多表回放结果的实时消费也是严格按照时序进行的?
2.2. N 对 1 异构回放
面对上述多表回放的难点,DolphinDB 进一步增加了异构模式的多表回放,支持将多个不同表结构的数据表写入到同一张异构流数据表中,从而实现了严格按时间顺序的多表回放。以下是异构模式的多表回放示例:
orderDS = replayDS(sqlObj=<select * from loadTable("dfs://order", "order") where Date = 2020.12.31>, dateColumn=`Date, timeColumn=`Time) tradeDS = replayDS(sqlObj=<select * from loadTable("dfs://trade", "trade") where Date = 2020.12.31>, dateColumn=`Date, timeColumn=`Time) snapshotDS = replayDS(sqlObj=<select * from loadTable("dfs://snapshot", "snapshot") where Date =2020.12.31>, dateColumn=`Date, timeColumn=`Time) inputDict = dict(["order", "trade", "snapshot"], [orderDS, tradeDS, snapshotDS]) replay(inputTables=inputDict, outputTables=messageStream, dateColumn=`Date, timeColumn=`Time, replayRate=10000, absoluteRate=true)
异构回放时将 replay 函数的 inputTables 参数指定为字典,outputTables 参数指定为异构流数据表。inputTables 参数指定多个结构不同的数据源,字典的 key 是用户自定义的字符串,是数据源的唯一标识,将会对应 outputTables 参数指定的表的第二列,字典的 value 是通过 replayDS 定义的数据源或者表。
以上脚本中的输出表 messageStream 为异构流数据表,其表结构如下:
name | typeString | comment |
---|---|---|
msgTime | TIMESTAMP | 消息时间 |
msgType | SYMBOL | 数据源标识:"order"、"trade"、"snapshot" |
msgBody | BLOB | 消息内容,以二进制格式存储 |
异构回放时 outputTables 参数指定的表至少需要包含以上三列,此外,还可以指定各输入表的公共列(列名和类型一致的列)。回放完成后,表 messageStream 的数据预览如下:
表中每行记录对应输入表中的一行记录,msgTime 字段是输入表中的时间列,msgType 字段用来区分来自哪张输入表,msgBody 字段以二进制格式存储了输入表中的记录内容。在回放的过程中,通过异构流数据表这样的数据结构可以对多个数据源进行全局排序,因而保证了多个数据源之间的严格时间顺序。同时,异构流数据表和普通流数据表一样可以被订阅,即多种类型的数据存储在同一张表中被发布并被同一个线程实时处理,因而也保证了消费的严格时序性。
若要对异构流数据表进行数据处理操作,如指标计算等,则需要将二进制格式的消息内容反序列化为原始结构的一行记录。DolphinDB 在脚本语言以及在 API 中均支持了对异构流数据表的解析功能。脚本支持流数据分发引擎 streamFilter 对异构流数据表进行反序列化以及反序列后结果的数据处理;同时,各类 API 在支持流数据订阅功能的基础上,扩展支持了在订阅时对异构流数据表进行反序列化。
3. 多表回放应用:股票行情回放
基于上文提到的异构回放、异构流数据表解析以及 DolphinDB 流处理框架中的其他特性等,本章将结合股票行情回放展示 DolphinDB 异构模式的多表回放功能在实际场景中的应用,包括数据回放以及三种具体的回放结果消费方案:使用内置流计算引擎实时处理数据、实时推送外部消息中间件、外部程序订阅与实时消费。
3.1. 行情回放与消费方案
行情多表回放方案的数据处理流程图如下:
处理流程图说明:
回放与消费流程围绕异构流数据表 messageStream 展开。
图中异构流数据表模块以上,为异构模式的多表回放的示意图,由数据回放工具即 replay 和 replayDS 函数,将存储在多个数据库中的原始数据回放至异构流数据表中。
图中异构流数据表模块以下,分为三条支路,分别对应对回放结果的三种不同的处理方式,从左至右依次是:
- 在 DolphinDB 的订阅中,通过内置的流计算引擎实时计算指标,本文将使用 asof join 引擎实时关联逐笔成交与快照数据,计算个股交易成本并写入结果流数据表;
- 在 DolphinDB 的订阅中,通过 Kafka 插件将回放结果实时写入外部的消息中间件 Kafka;
- 在外部程序中,通过 DolphinDB 的流数据 API 来实时订阅和消费回放结果,本文将使用 C++API。
3.2. 测试数据集
本教程基于上交所某日的股票行情数据进行回放,包括逐笔委托、逐笔成交、Level2 快照三种行情数据,分别存放在分布式数据库 "dfs://order"、"dfs://trade"、"dfs://snapshot" 中,均使用 TSDB 存储引擎,数据概览如下:
数据集 | 字段数 | 总行数 | 数据大小 | 简称 | 分区机制 | 排序列 |
---|---|---|---|---|---|---|
逐笔委托 | 20 | 49018552 | 6.8G | order | VALUE: 交易日, HASH: [SYMBOL, 20] | 股票, 交易时间 |
逐笔成交 | 15 | 43652718 | 3.3G | trade | VALUE: 交易日, HASH: [SYMBOL, 20] | 股票, 交易时间 |
Level2 行情快照 | 55 | 8410359 | 4.1G | snapshot | VALUE: 交易日, HASH: [SYMBOL, 20] | 股票, 交易时间 |
后文也将提供部分原始数据的 csv 文件以及对应的数据导入脚本,以便快速体验回放功能。
3.3. 代码实现
本教程脚本开发工具采用 DolphinDB GUI,相关环境配置见后文开发环境配置。
3.3.1. 股票行情回放
本小节脚本将三个数据库中的不同结构的数据回放至同一个异构流数据表中。完整脚本见附录 股票行情回放. txt。
- 创建异构流数据表 messageStream
colName = `msgTime`msgType`msgBody colType = [TIMESTAMP,SYMBOL, BLOB] messageTemp = streamTable(1000000:0, colName, colType) enableTableShareAndPersistence(table=messageTemp, tableName="messageStream", asynWrite=true, compress=true, cacheSize=1000000, retentionMinutes=1440, flushMode=0, preCache=10000) messageTemp = NULL
messageStream 是共享的异步持久化异构流数据表。为了之后能够对该表进行订阅,必须将其定义为共享的流数据表,共享意味着在当前节点的所有会话中可见。同时此处对流数据表进行持久化,其主要目的是控制该表的最大内存占用,enableTableShareAndPersistence 函数中的 cacheSize 参数规定了该表在内存中最多保留 100 万行。流数据持久化也保障了流数据的备份和恢复,当节点异常关闭后,持久化的数据会在重启时自动载入流数据表以继续流数据消费。
- 三个数据源异构回放至流数据表 messageStream
timeRS = cutPoints(09:15:00.000..15:00:00.000, 100) orderDS = replayDS(sqlObj=<select * from loadTable("dfs://order", "order") where Date = 2020.12.31>, dateColumn=`Date, timeColumn=`Time, timeRepartitionSchema=timeRS) tradeDS = replayDS(sqlObj=<select * from loadTable("dfs://trade", "trade") where Date = 2020.12.31>, dateColumn=`Date, timeColumn=`Time, timeRepartitionSchema=timeRS) snapshotDS = replayDS(sqlObj=<select * from loadTable("dfs://snapshot", "snapshot") where Date =2020.12.31>, dateColumn=`Date, timeColumn=`Time, timeRepartitionSchema=timeRS) inputDict = dict(["order", "trade", "snapshot"], [orderDS, tradeDS, snapshotDS]) submitJob("replay", "replay stock market", replay, inputDict, messageStream, `Date, `Time, , , 3)
上述脚本读取三个数据库中的结构不同的数据表进行全速的异构回放,回放通过 submitJob 函数提交后台作业来执行。下面讲解对于回放进行调优的相关参数和原理:
replayDS 函数中的 timeRepartitionSchema 参数:replayDS 函数将输入的 SQL 查询转化为数据源,其会根据输入表的分区以及 timeRepartitionSchema 参数,将原始的 SQL 查询按照时间顺序拆分成若干小的 SQL 查询。
replay 函数中的 parallelLevel 参数:parallelLevel 表示从数据库加载数据到内存的工作线程数量,即同时查询经过划分之后的小数据源的并行度,默认为 1,上述脚本中通过 submitjob 的参数设置为 3。
对数据库表的回放过程分为两步,其一是通过 SQL 查询历史数据至内存,查询包括对数据的排序,其二是将内存中的数据写入输出表,两步以流水线的方式执行。若将某日数据全部导入内存并排序,会占用大量内存甚至导致内存不足,同时由于全部数据的查询耗时比较长,会导致第二步写入输出表的时刻也相应推迟。以 orderDS 为例,若不设置 timeRepartitionSchema 参数,则相应的 SQL 查询为:
select * from loadTable("dfs://order", "order") where Date = 2020.12.31 order by Time
因此针对大数据量的场景,本教程先对 replayDS 函数指定 timeRepartitionSchema 参数,将数据按照时间戳分为 100 个小数据源,则每次查询的最小单位为其中一个小数据源,同时提高 parallelLevel 参数来帮助提升查询速度。以 orderDS 为例,若设置上述 timeRepartitionSchema 参数,则相应的其中一个 SQL 查询为:
select * from loadTable("dfs://order", "order") where Date = 2020.12.31, 09:15:00.000 <= Time < 09:18:27.001 order by Time
后文的 性能测试 章节利用本节的脚本进行了性能测试,最终总耗时 4m18s,内存占用峰值 4.7GB。内存主要由回放过程中的 SQL 查询和输出表等占用,通过对数据源进行切割以及对输出表进行持久化能有效控制内存使用。
作业运维:在 submitJob 函数提交后,通过 getRecentJobs 函数可以查看后台作业的状态,如果 endTime 和 errorMsg 为空,说明任务正在正常运行中。也可以用 cancelJob 函数取消回放,其输入参数 jobId 通过 getRecentJobs 获取。
若没有可作为数据源的数据库,也可以通过加载 csv 文件至内存中进行回放来快速体验本教程,附录中的数据文件提供了 100 支股票的某日完整行情数据,全部数据在内存中约占 700M。以下脚本需要修改 loadText 的路径为实际的 csv 文本数据存储路径。
orderDS = select * from loadText("/yourDataPath/replayData/order.csv") order by Time tradeDS = select * from loadText("/yourDataPath/replayData/trade.csv") order by Time snapshotDS = select * from loadText("/yourDataPath/replayData/snapshot.csv") order by Time inputDict = dict(["order", "trade", "snapshot"], [orderDS, tradeDS, snapshotDS]) submitJob("replay", "replay text", replay, inputDict, messageStream, `Date, `Time, , , 1)
3.3.2. 消费场景 1:在 DolphinDB 订阅中实时计算个股交易成本
本小节脚本将实时消费小节创建的流数据表 messageStream,使用 asof join 引擎实时关联逐笔成交与快照数据,并计算个股交易成本,完整脚本见附录 消费场景 1: 计算个股交易成本_asofJoin 实现. txt。
- 创建计算结果输出表 prevailingQuotes
colName = `TradeTime`SecurityID`Price`TradeQty`BidPX1`OfferPX1`Spread`SnapshotTime colType = [TIME, SYMBOL, DOUBLE, INT, DOUBLE, DOUBLE, DOUBLE, TIME] prevailingQuotesTemp = streamTable(1000000:0, colName, colType) enableTableShareAndPersistence(table=prevailingQuotesTemp, tableName="prevailingQuotes", asynWrite=true, compress=true, cacheSize=1000000, retentionMinutes=1440, flushMode=0, preCache=10000) prevailingQuotesTemp = NULL
prevailingQuotes 被定义为共享流数据表,之后可以对其进行订阅和进一步的处理。
- 创建流计算 asof join 引擎
def createSchemaTable(dbName, tableName){ schema = loadTable(dbName, tableName).schema().colDefs return table(1:0, schema.name, schema.typeString) } tradeSchema = createSchemaTable("dfs://trade", "trade") snapshotSchema = createSchemaTable("dfs://snapshot", "snapshot") joinEngine=createAsofJoinEngine(name="tradeJoinSnapshot", leftTable=tradeSchema, rightTable=snapshotSchema, outputTable=prevailingQuotes, metrics=<[Price, TradeQty, BidPX1, OfferPX1, abs(Price-(BidPX1+OfferPX1)/2), snapshotSchema.Time]>, matchingColumn=`SecurityID, timeColumn=`Time, useSystemTime=false, delayedTime=1)
使用 asof join 引擎实现在对股票分组的基础上,对于每条输入的 trade 记录,实时关联与之在时间列上最接近的一条 snapshot 记录,并使用 trade 中的价格字段和 snapshot 中的报价字段进行指标计算。最终,以上配置的 asof join 引擎会输出和左表行数相同的结果。asof join 引擎更多介绍请参考第八章 createAsofJoinEngine
的相关介绍。
考虑到实际的业务含义,此例中 asof join 引擎在用于两个数据流的实时关联时,配置参数 useSystemTime=false 以按照数据中的时间列进行关联计算。使用数据中的时间列,相较于使用数据注入引擎时的系统时间作为时间列,可以避免在实时场景中两个数据流到达引擎的时刻乱序而带来的问题。但是,在此处,因为异构回放能够严格保证两个数据流的处理顺序,因此也可以使用数据注入引擎的系统时间进行关联计算。除了在使用 asof join 引擎时配置参数 useSystemTime=true 外,使用 look up join 引擎也能够实现按系统时间进行实时关联,即当每一条 trade 表中的记录注入引擎时,总是立刻去关联已经注入引擎的最新的一条相应股票的 snapshot 记录,得到的计算结果会同上文中的 asof join 引擎实现完全一致,而由于 look up join 引擎在内部实现上更简单,所以在计算性能上会有稍好的表现,完成脚本见附录 消费场景 1: 计算个股交易成本_lookUpJoin 实现. txt。look up join 引擎更多介绍请参考第八章中createLookUpJoinEngine
的相关介绍。
自定义函数 createSchemaTable,用于获取数据库表的表结构,以做为创建引擎时的参数传入。
- 创建流计算过滤与分发引擎
def appendLeftStream(msg){ tempMsg = select * from msg where Price > 0 and Time>=09:30:00.000 getLeftStream(getStreamEngine(`tradeJoinSnapshot)).tableInsert(tempMsg) } filter1 = dict(STRING,ANY) filter1["condition"] = "trade" filter1["handler"] = appendLeftStream filter2 = dict(STRING,ANY) filter2["condition"] = "snapshot" filter2["handler"] = getRightStream(getStreamEngine(`tradeJoinSnapshot)) schema = dict(["trade", "snapshot"], [tradeSchema, snapshotSchema]) engine = streamFilter(name="streamFilter", dummyTable=messageStream, filter=[filter1, filter2], msgSchema=schema)
streamFilter 函数通过设置 msgSchema 参数,会对异构流数据表进行反序列,并根据 filter 参数中设置的 handler 来处理订阅的数据。当订阅数据到来时,handler 之间是串行执行的,这样就保证了对数据的处理严格按照时序进行。
handler 参数是一元函数或数据表,用于处理订阅的数据。当它是函数时,其唯一的参数是经过解析和过滤后的数据表。这里对于 trade 数据其 handler 设置为函数 appendLeftStream,该函数对订阅到的数据首先做过滤,再将符合条件的数据作为左表写入到 asof join 引擎。对于 snapshot 数据其 handler 设置为 asof join 引擎的右表,表示对于订阅到的 snapshot 数据直接作为右表写入 asof join 引擎。
- 订阅异构流数据表
subscribeTable(tableName="messageStream", actionName="tradeJoinSnapshot", offset=-1, handler=engine, msgAsTable=true, reconnect=true)
设置参数 offset 为 -1,订阅将会从提交订阅时流数据表的当前行开始。为了消费到完整的数据,建议先执行此处脚本以提交订阅,再提交后台回放作业,参考 3.3.1 小节。
- 查看计算结果
asof join 引擎在 matchingColumn 的分组内输出表数据与输入时的顺序一致。
3.3.3. 消费场景 2:在 DolphinDB 订阅中将回放结果实时推送 Kafka
本小节脚本将前文中创建的流数据表 messageStream 实时发送至消息中间件 Kafka。执行以下脚本,需要有可以写入的 Kafka server,并且安装 DolphinDB Kafka 插件,插件配置见后文开发环境配置。完整脚本见附录 消费场景 2: 实时推送 Kafka.txt。
- 加载 Kafka 插件并创建 Kafka producer
loadPlugin("/DolphinDB/server/plugins/kafka/PluginKafka.txt") go producerCfg = dict(STRING, ANY) producerCfg["metadata.broker.list"] = "localhost" producer = kafka::producer(producerCfg)
loadPlugin 的路径和 producer 配置请按需修改。本教程涉及的 Kafka server 和 DolphinDB server 在同一台服务器上,故 metadata.broker.list 参数为 localhost。
- 定义推送数据至 Kafka topic 的函数
def sendMsgToKafkaFunc(dataType, producer, msg){ startTime = now() try { kafka::produce(producer, "topic-message", 1, msg, true) cost = now() - startTime writeLog("[Kafka Plugin] Successed to send" + dataType + ":" + msg.size() + "rows," + cost + "ms.") } catch(ex) {writeLog("[Kafka Plugin] Failed to send msg to kafka with error:" +ex)} }
kafka::produce 函数会将任意表结构的 msg 以 json 格式发送至指定的 Kafka topic。此处的 writeLog 函数在日志中打印每批推送的情况来方便运维观察。
- 注册流数据过滤与分发引擎
filter1 = dict(STRING,ANY) filter1["condition"] = "order" filter1["handler"] = sendMsgToKafkaFunc{"order", producer} filter2 = dict(STRING,ANY) filter2["condition"] = "trade" filter2["handler"] = sendMsgToKafkaFunc{"trade", producer} filter3 = dict(STRING,ANY) filter3["condition"] = "snapshot" filter3["handler"] = sendMsgToKafkaFunc{"snapshot", producer} schema = dict(["order","trade", "snapshot"], [loadTable("dfs://order", "order"), loadTable("dfs://trade", "trade"), loadTable("dfs://snapshot", "snapshot")]) engine = streamFilter(name="streamFilter", dummyTable=messageStream, filter=[filter1, filter2, filter3], msgSchema=schema)
streamFilter 函数通过设置 msgSchema 参数,会对异构流数据表进行反序列,并根据 filter 参数中设置的 handler 来处理订阅的数据。当订阅数据到来时,handler 之间是串行执行的,这样就保证了对数据的处理严格按照时序进行。
handler 参数是一元函数或数据表,用于处理订阅的数据。当它是函数时,其唯一的参数是经过解析和过滤后的数据表。sendMsgToKafka{"order", producer} 的写法是函数化编程中的部分应用,即指定函数 sendMsgToKafka 的部分参数,产生一个参数较少的新函数。
- 订阅异构流数据表
subscribeTable(tableName="messageStream", actionName="sendMsgToKafka", offset=-1, handler=engine, msgAsTable=true, reconnect=true)
设置参数 offset 为 -1,订阅将会从提交订阅时流数据表的当前行开始。为了消费到完整的数据,建议先执行此处脚本以提交订阅,再提交后台回放作业,参考 3.3.1 小节。
- 在终端查看发送结果
在命令行开启消费者进程,从第一条开始消费名为 topic-message 的 topic
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic topic-message
返回:
3.3.4. 消费场景 3:在外部程序中通过 C++API 实时订阅与处理
本小节代码为 C++ 程序,程序会订阅前文创建的异构流数据表 messageStream,并实时打印每条数据。以下代码依赖DolphinDB C++ API,API安装见后文开发环境配置。完整代码见附录 消费场景 3:C++API 实时订阅. cpp。
int main(int argc, char *argv[]){
DBConnection conn;
string hostName = "127.0.0.1";
int port = 8848;
bool ret = conn.connect(hostName, port);
conn.run("login(\"admin\", \"123456\")");
DictionarySP t1schema = conn.run("loadTable(\"dfs://snapshotL2\", \"snapshotL2\").schema()");
DictionarySP t2schema = conn.run("loadTable(\"dfs://trade\", \"trade\").schema()");
DictionarySP t3schema = conn.run("loadTable(\"dfs://order\", \"order\").schema()");
unordered_map<string, DictionarySP> sym2schema;
sym2schema["snapshot"] = t1schema;
sym2schema["trade"] = t2schema;
sym2schema["order"] = t3schema;
StreamDeserializerSP sdsp = new StreamDeserializer(sym2schema);
auto myHandler = [&](Message msg) {
const string &symbol = msg.getSymbol();
cout << symbol << ":";
size_t len = msg->size();
for (int i = 0; i < len; i++) {
cout <<msg->get(i)->getString() << ",";
}
cout << endl;
};
int listenport = 10260;
ThreadedClient threadedClient(listenport);
auto thread = threadedClient.subscribe(hostName, port, myHandler, "messageStream", "printMessageStream", -1, true, nullptr, false, false, sdsp);
cout<<"Successed to subscribe messageStream"<<endl;
thread->join();
return 0;
}
调用订阅函数 ThreadedClient::subscribe 订阅异构流数据表时,在最后一个参数指定相应的流数据反序列化实例 StreamDeserializerSP,则在订阅时会对收到的数据进行反序列化再传递给用户自定义的回调函数 myHandler。
listenport 参数为单线程客户端的订阅端口号,设置 C++ 程序所在服务器的任意空闲端口即可。
- 在终端查看程序实时打印的内容:
3.3.5. 清理环境
如果反复执行上述脚本,可能需要清理流数据表、取消订阅、注销流计算引擎等操作,建议在运行回放与消费脚本前先清理环境。本教程的清理环境脚本见附录 清理环境. txt。
4. 性能测试
本教程对异构模式下的多表回放功能进行了性能测试。以第三章提及的测试数据集作为回放的输入,以第三章第三小节的回放脚本作为测试脚本,该脚本不设定回放速率(即以最快的速率回放),并且输出表没有任何订阅,最终回放了 101,081,629 条数据至输出表中,总耗时 4m18s,每秒回放约 39 万条数据,内存占用峰值 4.7GB。测试使用的服务器的 CPU 为 Intel(R) Xeon(R) Silver 4216 CPU @ 2.10GHz,更详细的服务器及DolphinDB server配置信息见后文开发环境配置。
5. 开发环境配置
服务器环境
- CPU 类型:Intel(R) Xeon(R) Silver 4216 CPU @ 2.10GHz
- 逻辑 CPU 总数:8
- 内存:64GB
- OS:64 位 CentOS Linux 7 (Core)
DolphinDB server 部署
- server 版本:2.00.6
- server 部署模式:单节点
- 配置文件:dolphindb.cfg
localSite=localhost:8848:local8848
mode=single
maxMemSize=32
maxConnections=512
workerNum=8
maxConnectionPerSite=15
newValuePartitionPolicy=add
webWorkerNum=2
dataSync=1
persistenceDir=/DolphinDB/server/persistenceDir
maxPubConnections=64
subExecutors=16
subPort=8849
subThrottle=1
persistenceWorkerNum=1
lanCluster=0
注意: 配置参数 persistenceDir
需要开发人员根据实际环境配置。
单节点部署教程:单节点部署
DolphinDB client 开发环境
- CPU 类型:Intel(R) Core(TM) i7-7700 CPU @ 3.60GHz 3.60 GHz
- 逻辑 CPU 总数:8
- 内存:32GB
- OS:Windows 10 专业版
- DolphinDB GUI 版本:1.30.15
DolphinDB Kafka 插件安装
- Kafka 插件版本:release200
注意: Kafka 插件版本建议按 DolphinDB server 版本选择,如 2.00.6 版本的 server 安装 release200 分支的插件
Kafka 插件教程:Kafka 插件教程
Kafka server 部署
- zookeeper 版本:3.4.6
- Kafka 版本:2.12-2.6.2
- Kafka 部署模式:单节点
- 创建 Kafka topic
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic topic-message
DolphinDB C++ API 安装
- C++ API 版本:release200
注意: C++API 版本建议按 DolphinDB server 版本选择,如 2.00.6 版本的 server 安装 release200 分支的 API
C++ API 教程:C++ API 教程
6. 搭建行情回放服务的最佳实践
一个量化策略在生产(交易)环境中运行时,实时数据的处理通常是由事件驱动的。为确保研发和生产使用同一套代码,通常在研发阶段需将历史数据,严格按照事件发生的时间顺序进行回放,以此模拟交易环境。在 DolphinDB 中,用户通过 replay
函数可以实现对静态数据的回放,即将历史数据按照时间顺序以“实时数据”的方式注入流数据表中。对相同时间戳的数据还可以指定额外排序列,使数据回放顺序更接近实时交易场景。
在历史数据回放、股票行情回放两篇教程中已经介绍了 DolphinDB 的回放功能,本教程更加侧重于回放功能的工程化实践。本教程将介绍如何基于 DolphinDB 分布式数据库、回放功能以及 DolphinDB API 搭建一个行情数据回放服务,该服务支持多个用户同时通过 C++ 、 Python 等客户端提交数据回放请求。
6.1. 基于 DolphinDB 的行情回放服务
本教程实现的行情回放服务基于 3 类国内 A 股行情数"data_replay.md"数据,支持以下功能与特性:
- C++、Python 客户端提交回放请求(指定回放股票列表 、回放日期、回放速率、回放数据源)
- 多个用户同时回放
- 多个数据源同时有序回放
- 在时间有序的基础上支持排序列有序(如:针对逐笔数据中的交易所原始消息记录号排序)
- 发布回放结束信号
- 对回放结果订阅消费
6.1.1. 行情回放服务架构
本教程示例 DolphinDB 搭建的行情回放服务架构如下图所示:
- 行情数据接入:实时行情数据和历史行情数据可以通过 DolphinDB API 或插件存储到 DolphinDB 分布式时序数据库中。
- 函数模块封装:数据查询和回放过程可以通过 DolphinDB 函数视图封装内置,仅暴露股票列表 、回放日期、回放速率、回放数据源等关键参数给行情服务用户。
- 行情用户请求:需要进行行情回放的用户可以通过 DolphinDB 客户端软件(如 DolphinDB GUI 工具、DolphinDB VS Code 插件、DolphinDB API 等)调用封装好的回放函数对存储在数据库中的行情数据进行回放,同时,用户还可以在客户端对回放结果进行实时订阅消费。此外,支持多用户并发回放。
6.1.2. 回放服务搭建步骤
本教程示例 DolphinDB 搭建行情回放服务的具体操作步骤如下图所示:
- Step 1:服务提供者设计合理分区的数据库表,在 DolphinDB 集成开发环境中执行对应的建库建表、数据导入等脚本,以将历史行情数据存储到 DolphinDB 分布式数据库中作为回放服务的数据源。在第二章将给出本教程涉及的 3 类行情数据的分区存储方案及建库建表脚本。
- Step 2:服务提供者在 DolphinDB 集成开发环境中将回放过程中的操作封装成函数视图,通过封装使得行情服务用户不需要关心 DolphinDB 回放功能的细节,只需要指定简单的回放参数(股票、日期、回放速率、数据源)即可提交回放请求。在第三章将给全部函数视图的定义脚本。
- Step 3:服务提供者在外部程序中通过 DolphinDB API 调用上述函数视图实现提交回放的功能。在第四章将给出 API 端提交回放任务的 C++ 实现和 Python 实现。此外,在第四章提交回放的基础上,在第五章将介绍对回放结果的 API 端订阅与消费的代码实现。
在第六章将给出多用户多表回放、多天回放的性能测试结果。最后两章为开发环境配置与总结。
6.2. 行情数据分区存储方案
本教程的回放服务基于 3 类国内 A 股行情数据源:逐笔成交数据、逐笔委托数据、快照数据,均使用 TSDB 存储引擎存储在 DolphinDB 分布式数据库中。
数据源 | 代码样例中的分区数据库路径 | 代码样例中的表名 | 分区机制 | 排序列 | 建库建表脚本 |
---|---|---|---|---|---|
逐笔委托 | dfs://Test_order | order | VALUE: 交易日, HASH: [SYMBOL, 25] | 股票ID,交易时间 | 附录 逐笔委托建库建表脚本 |
逐笔成交 | dfs://Test_transaction | transaction | VALUE: 交易日, HASH: [SYMBOL, 25] | 股票ID,交易时间 | 附录 逐笔成交建库建表脚本 |
快照 | dfs://Test_snapshot | snapshot | VALUE: 交易日, HASH: [SYMBOL, 20] | 股票ID,交易时间 | 附录 快照建库建表脚本 |
回放的原理是从数据库中读取需要的行情数据,并根据时间列排序后写入到相应的流数据表。因此,读数据库并排序的性能对回放速度有很大的影响,合理的分区机制将有助于提高数据加载速度。基于用户通常按照日期和股票提交回放请求的特点,设计了上表的分区方案。
此外,本教程的历史数据存储在三节点双副本的 DolphinDB 集群中,集群和副本同样可以提升读取性能,同时可以增加系统的可用性,分区的副本通常是存放在不同的物理节点的,所以一旦某个分区不可用,系统依然可以调用其它副本分区来保证回放服务的正常运转。
附录将提供部分原始数据的 csv 文件(原始行情数据文件)以及对应的示例导入脚本(逐笔委托示例导入脚本、逐笔成交示例导入脚本、快照示例导入脚本),以便读者快速体验搭建本教程所示的行情回放服务。
6.3. 行情回放自定义函数
本章介绍回放过程中的主要函数功能及其实现,最后将函数封装成视图以便通过 API 等方式调用。
本章的开发工具采用 DolphinDB GUI,完整脚本见附录:行情回放函数。
函数名 | 函数入参 | 函数功能 |
---|---|---|
stkReplay | stkList:回放股票列表startDate:回放开始日期endDate:回放结束日期replayRate:回放速率replayUuid:回放用户标识replayName:回放数据源名称 | 行情回放服务主函数 |
dsTb | timeRS:数据源时间划分startDate:回放开始日期endDate:回放结束日期stkList:回放股票列表replayName:回放数据源名称 | 构造回放数据源 |
createEnd | tabName:回放输出表名称sortColumn:相同时间戳时额外排序列名 | 构造回放结束信号 |
replayJob | inputDict:回放输入数据源tabName:回放输出表名称dateDict:回放排序时间列timeDict:回放排序时间列replayRate:回放速率sortColumn:相同时间戳时额外排序列名 | 定义回放任务内容 |
6.3.1. stkReplay 函数:行情回放服务主函数
函数定义代码:
def stkReplay(stkList, mutable startDate, mutable endDate, replayRate, replayUuid, replayName)
{
maxCnt = 50
returnBody = dict(STRING, STRING)
startDate = datetimeParse(startDate, "yyyyMMdd")
endDate = datetimeParse(endDate, "yyyyMMdd") + 1
sortColumn = "ApplSeqNum"
if(stkList.size() > maxCnt)
{
returnBody["errorCode"] = "0"
returnBody["errorMsg"] = "超过单次回放股票上限,最大回放上限:" + string(maxCnt)
return returnBody
}
if(size(replayName) != 0)
{
for(name in replayName)
{
if(not name in ["snapshot", "order", "transaction"])
{
returnBody["errorCode"] = "0"
returnBody["errorMsg"] = "请输入正确的数据源名称,不能识别的数据源名称:" + name
return returnBody
}
}
}
else
{
returnBody["errorCode"] = "0"
returnBody["errorMsg"] = "缺少回放数据源,请输入正确的数据源名称"
return returnBody
}
try
{
if(size(replayName) == 1 && replayName[0] == "snapshot")
{
colName = ["timestamp", "biz_type", "biz_data"]
colType = [TIMESTAMP, SYMBOL, BLOB]
sortColumn = "NULL"
}
else
{
colName = ["timestamp", "biz_type", "biz_data", sortColumn]
colType = [TIMESTAMP, SYMBOL, BLOB, LONG]
}
msgTmp = streamTable(10000000:0, colName, colType)
tabName = "replay_" + replayUuid
enableTableShareAndPersistence(table=msgTmp, tableName=tabName, asynWrite=true, compress=true, cacheSize=10000000, retentionMinutes=60, flushMode=0, preCache=1000000)
timeRS = cutPoints(09:30:00.000..15:00:00.000, 23)
inputDict = dict(replayName, each(dsTb{timeRS, startDate, endDate, stkList}, replayName))
dateDict = dict(replayName, take(`MDDate, replayName.size()))
timeDict = dict(replayName, take(`MDTime, replayName.size()))
jobId = "replay_" + replayUuid
jobDesc = "replay stock data"
submitJob(jobId, jobDesc, replayJob{inputDict, tabName, dateDict, timeDict, replayRate, sortColumn})
returnBody["errorCode"] = "1"
returnBody["errorMsg"] = "后台回放成功"
return returnBody
}
catch(ex)
{
returnBody["errorCode"] = "0"
returnBody["errorMsg"] = "回放行情数据异常,异常信息:" + ex
return returnBody
}
}
函数功能:
自定义函数 stkReplay 是整个回放的主体函数,用户传入的参数在 stkReplay 里会进行有效性判断及格式处理,可以根据实际需求更改。
首先,用 maxCnt 来控制用户一次回放股票数量的最大上限,本例中设置的是 50 。returnBody 构造了信息字典,返回给用户以提示执行错误或执行成功。回放开始日期 startDate 和回放结束日期 endDate 利用datetimeParse
函数进行格式处理 。replayRate 是回放速率,replayUuid 是回放表名名称,replayName 是回放数据源列表,sortColumn 是数据源同回放时间戳排序列列名。
当输入参数无误后,便初始化回放结果流数据表,结果流数据表为异构流数据表,字段类型为 BLOB 的字段包含了一条原始记录的全部信息,同时结果流数据表为持久化流数据表,enableTableShareAndPersistence
函数把流数据表共享并把它持久化到磁盘上,使用持久化流数据表可以避免内存占用过大。当回放数据源包含逐笔成交(transaction )或逐笔委托(order)时,本例实现了对相同时间戳的逐笔数据按交易所原始消息记录号(ApplSeqNum)进行排序(具体实现见 replayJob 函数),所以结果流数据表中必须冗余一列来存放排序列。若回放数据源仅包含快照(snapshot)时,则不需要冗余一列排序列。
定义回放需要的其他参数。inputDict 构造了回放数据源列表字典,利用 each
函数可以对多个数据源进行简洁的定义。dateDict 和 timeDict 构造了回放数据源时间戳字典。最后通过submitJob
提交后台回放任务。
6.3.2. dsTb 函数:构造回放数据源
函数定义代码:
def dsTb(timeRS, startDate, endDate, stkList, replayName)
{
if(replayName == "snapshot"){
tab = loadTable("dfs://Test_snapshot", "snapshot")
}
else if(replayName == "order") {
tab = loadTable("dfs://Test_order", "order")
}
else if(replayName == "transaction") {
tab = loadTable("dfs://Test_transaction", "transaction")
}
else {
return NULL
}
ds = replayDS(sqlObj=<select * from tab where MDDate>=startDate and MDDate<endDate and HTSCSecurityID in stkList>, dateColumn='MDDate', timeColumn='MDTime', timeRepartitionSchema=timeRS)
return ds
}
函数功能:
自定义函数 dsTb 返回符合用户回放需求的数据源划分结果。主要是对内置 replayDS 函数进行了进一步的封装,函数首先对用户端输入的 replayName 进行判断,选择从数据库加载对应的表对象。利用 replayDS 对交易日期在 startDate 至 endDate 之间、股票代号在 stkList 内的数据按照 timeRS 进行数据源划分,返回某一个数据源的列表。
timeRS 参数对应 replayDS 函数中的 timeRepartitionSchema 参数,是时间类型的向量,可用于将数据源划分为更小粒度的多个数据源,以保证查询 DFS 表中数据的效率以及控制内存大小。本例在 stkReplay 函数 中构造了 timeRS 变量,其意为对于一天的数据在有效时间段内平均分为 23 份。
执行如下代码查看 dsTb
函数返回的结果:
timeRS = cutPoints(09:30:00.000..15:00:00.000, 3)
startDate = 2021.12.01
endDate = 2021.12.02
stkList = ['000616.SZ']
replayName = ["order"]
ds = dsTb(timeRS, startDate, endDate, stkList, replayName)
ds 为一个向量,其中每一个元素如下,数据源被划分为多个小的 SQL 查询语句,具体原理参考replayDS
函数。
DataSource< select [4] * from tab where time(MDTime) < 09:30:00.000,nanotime(MDTime) >= 00:00:00.000000000,date(MDDate) == 2021.12.01,MDDate >= 2021.12.01 and MDDate < 2021.12.02 and SecurityID in ["000616.SZ"] order by MDDate asc,MDTime asc >
DataSource< select [4] * from tab where time(MDTime) < 11:20:00.001,time(MDTime) >= 09:30:00.000,date(MDDate) == 2021.12.01,MDDate >= 2021.12.01 and MDDate < 2021.12.02 and SecurityID in ["000616.SZ"] order by MDDate asc,MDTime asc >
DataSource< select [4] * from tab where time(MDTime) < 13:10:00.001,time(MDTime) >= 11:20:00.001,date(MDDate) == 2021.12.01,MDDate >= 2021.12.01 and MDDate < 2021.12.02 and SecurityID in ["000616.SZ"] order by MDDate asc,MDTime asc >
DataSource< select [4] * from tab where time(MDTime) < 15:00:00.001,time(MDTime) >= 13:10:00.001,date(MDDate) == 2021.12.01,MDDate >= 2021.12.01 and MDDate < 2021.12.02 and SecurityID in ["000616.SZ"] order by MDDate asc,MDTime asc >
DataSource< select [4] * from tab where nanotime(MDTime) <= 23:59:59.999999999,time(MDTime) >= 15:00:00.001,date(MDDate) == 2021.12.01,MDDate >= 2021.12.01 and MDDate < 2021.12.02 and SecurityID in ["000616.SZ"] order by MDDate asc,MDTime asc >
6.3.3. replayJob 函数:定义回放任务内容
函数定义代码:
def replayJob(inputDict, tabName, dateDict, timeDict, replayRate, sortColumn)
{
if(sortColumn == "NULL")
{
replay(inputTables=inputDict, outputTables=objByName(tabName), dateColumn=dateDict, timeColumn=timeDict, replayRate=int(replayRate), absoluteRate=false, parallelLevel=23)
}
else
{
replay(inputTables=inputDict, outputTables=objByName(tabName), dateColumn=dateDict, timeColumn=timeDict, replayRate=int(replayRate), absoluteRate=false, parallelLevel=23, sortColumns=sortColumn)
}
createEnd(tabName, sortColumn)
}
函数功能:
自定义函数 replayJob
提交用户数据回放并调用 createEnd
函数,这里首先通过内置 replay
函数回放用户需求数据,此处回放模式为 N 对 1 异构回放。replay
函数返回后即表示需要的数据已经全部回放结束,再执行自定义 createEnd
函数以构造结束信号写入回放结果里。调用 createEnd
函数是可选的,其功能是在回放结束时再发布一条特殊的记录,以标识回放结束。
参数 sortColumn 用于指定额外的排序列,如果用户回放的数据源为仅仅包含快照(snapshot),对于这种特殊情况需要指定 replayJob
函数的回放时间戳排序列参数 sortColumn 为 NULL ,则调用内置的 replay
函数时不加入 sortColumn 参数。
6.3.4. createEnd 函数:构造回放结束信号
函数定义代码:
def createEnd(tabName, sortColumn)
{
dbName = "dfs://End"
tbName = "endline"
if(not existsDatabase(dbName))
{
db = database(directory=dbName, partitionType=VALUE, partitionScheme=2023.04.03..2023.04.04)
endTb = table(2200.01.01T23:59:59.000 as DateTime, `END as point, long(0) as ApplSeqNum)
endLine = db.createPartitionedTable(table=endTb, tableName=tbName, partitionColumns=`DateTime)
endLine.append!(endTb)
}
ds = replayDS(sqlObj=<select * from loadTable(dbName, tbName)>, dateColumn='DateTime', timeColumn='DateTime')
inputEnd = dict(["end"], [ds])
dateEnd = dict(["end"], [`DateTime])
timeEnd = dict(["end"], [`DateTime])
if(sortColumn == "NULL")
{
replay(inputTables=inputEnd, outputTables=objByName(tabName), dateColumn=dateEnd, timeColumn=timeEnd, replayRate=-1, absoluteRate=false, parallelLevel=1)
}
else
{
replay(inputTables=inputEnd, outputTables=objByName(tabName), dateColumn=dateEnd, timeColumn=timeEnd, replayRate=-1, absoluteRate=false, parallelLevel=1, sortColumns=sortColumn)
}
}
函数功能:
自定义函数 createEnd 是在回放结束时给用户提供一条回放结束信号,利用了 replay 回放模式中的 N 对 1 异构回放构造回放信号,会往参数 tabName 指定的异构流数据表中写入一条消息类型列为 end 的记录。为方便后期异构消费解析以及复用,此处为结束信号独立建一个数据库并创建分区表,表内必须有时间列,其他字段可选。并向该表写入一条模拟数据,其数据内容没有任何强制要求。DolphinDB 建库建表相关知识可参考分区数据库 。inputEnd、dateEnd、timeEnd 字典的 key 按需设置为字符串 end,将对应 replay 函数指定的输出表中的第二个字典,即消息类型。
参数 sortColumn 用于指定额外的排序列,如果用户回放的数据源为仅仅包含快照(snapshot),则调用内置的 replay 函数时不加入 sortColumn 参数,结果流数据表中的结束信号记录如下。
6.3.5. 封装函数视图
将以上函数利用addFunctionView
封装成函数视图,具体代码如下。 API 端仅需要调用主函数 stkReplay
,第四章回放全部基于该函数视图。
addFunctionView(dsTb)
addFunctionView(createEnd)
addFunctionView(replayJob)
addFunctionView(stkReplay)
6.4. API 提交回放
6.4.1. C++ API
本例程序运行在 Linux 系统 DolphinDB C++ API 环境下,编译成可执行文件以命令行输入参数执行。完整脚本见附录 C++ 源码。
6.4.1.1. C++ API 调用回放服务
C++ API 调用回放服务代码如下:
DictionarySP result = conn.run("stkReplay", args);
string errorCode = result->get(Util::createString("errorCode"))->getString();
if (errorCode != "1")
{
std::cout << result->getString() << endl;
return -1;
}
conn 是 DolphinDB C++ API DBConnection 对象, C++ 应用可以通过它在 DolphinDB 服务器上执行脚本和函数并在两者之间双向传递数据。conn 通过 run
方法调用 stkReplay 自定义函数,args 容器内包含了 stkReplay
函数所需的参数。执行结果返回到字典 result 中,可以通过 get
方法来获得 key 为 errorCode 对应的值,如果错误代号不为 1 ,则代表回放执行错误,返回错误信息并结束程序。
多用户进行回放时,需要对用户回放的流数据表进行命名,为了不重复命名以避免冲突,对每个回放用户生成唯一识别号,会生成形如 “Eq8Jk8Dd0Tw5Ej8D” 的识别字符串,代码如下:
string uuid(int len)
{
char* str = (char*)malloc(len + 1);
srand(getpid());
for (int i = 0; i < len; ++i)
{
switch (i % 3)
{
case 0:
str[i] = 'A' + std::rand() % 26;
break;
case 1:
str[i] = 'a' + std::rand() % 26;
break;
default:
str[i] = '0' + std::rand() % 10;
break;
}
}
str[len] = '\0';
std::string rst = str;
free(str);
return rst;
}
6.4.2. Python API
本例程序运行在 Windows 系统 DolphinDB Python API 环境下,可以在任何具备该环境的客户端执行。完整脚本见附录 Python 源码。
6.4.2.1. Python API 调用回放服务
Python API 调用回放服务代码如下:
stk_list = ['000616.SZ','000681.SZ']
start_date = '20211201'
end_date = '20211201'
replay_rate = -1
replay_name = ['snapshot']
s.upload({'stk_list':stk_list, 'start_date':start_date, 'end_date':end_date, 'replay_rate':replay_rate, 'replay_uuid':uuidStr, 'replay_name':replay_name})
s.run("stkReplay(stk_list, start_date, end_date, replay_rate, replay_uuid, replay_name)")
stk_list 是回放股票列表,start_date 是回放开始日期,end_date 是回放结束日期,replay_rate 是回放速率,replay_uuid 是回放流数据表名称,replay_name 是回放数据源列表。s 是 DolphinDB 会话,Python 应用通过会话在 DolphinDB 服务器上执行脚本和函数以及在两者之间双向传递数据。使用 s 的 upload
方法上传 Python 对象,使用 run
方法执行 stkReplay 函数。
多用户进行回放时,需要对用户回放的流数据表进行命名,为了不重复命名以避免冲突,对每个回放用户生成唯一识别号,会生成形如“Eq8Jk8Dd0Tw5Ej8D”的识别字符串,代码如下:
def uuid(length):
str=""
for i in range(length):
if(i % 3 == 0):
str += chr(ord('A') + random.randint(0, os.getpid() + 1) % 26)
elif(i % 3 == 1):
str += chr(ord('a') + random.randint(0, os.getpid() + 1) % 26)
else:
str += chr(ord('0') + random.randint(0, os.getpid() + 1) % 10)
return str
uuidStr = uuid(16)
6.5. API 订阅消费
6.5.1. C++ API
6.5.1.1. 反序列化器构造
为使异构流数据表反序列化输出,构造输出表结构,代码如下:
DictionarySP snap_full_schema = conn.run("loadTable(\"dfs://Test_snapshot\", \"snapshot\").schema()");
DictionarySP order_full_schema = conn.run("loadTable(\"dfs://Test_order\", \"order\").schema()");
DictionarySP transac_full_schema = conn.run("loadTable(\"dfs://Test_transaction\", \"transaction\").schema()");
DictionarySP end_full_schema = conn.run("loadTable(\"dfs://End\", \"endline\").schema()");
unordered_map<string, DictionarySP> sym2schema;
sym2schema["snapshot"] = snap_full_schema;
sym2schema["order"] = order_full_schema;
sym2schema["transaction"] = transac_full_schema;
sym2schema["end"] = end_full_schema;
StreamDeserializerSP sdsp = new StreamDeserializer(sym2schema);
上述前四行分别是快照、逐笔委托、逐笔成交、结束信号的表结构构造,回放根据需要选择对应的表结构,结束信号表必须有。unordered_map 创建了 key->schema 的映射表,可以将回放数据源和结束信号按照 schema 进行结构解析。通过 StreamDeserializer 对象来构造异构流数据表反序列化器。
6.5.1.2. 订阅回放服务
在 DolphinDB C++ API 中订阅异构流数据表,代码如下:
int listenport = 10260;
ThreadedClient threadedClient(listenport);
string tableNameUuid = "replay_" + uuidStr;
auto thread = threadedClient.subscribe(hostName, port, myHandler, tableNameUuid, "stkReplay", 0, true, nullptr, true, 500000, 0.001, false, "admin", "123456", sdsp);
std::cout << "Successed to subscribe " + tableNameUuid << endl;
thread->join();
listenport 是单线程客户端的订阅端口号,tableNameUuid 是用户回放的流数据表名称,通过 threadedClient.subscribe 可以订阅回放的流数据表,详情用法参考 C++ API 订阅 。thread 指向循环调用 myHandler 的线程的指针,线程在 topic 被取消订阅后会退出。
6.5.1.3. 消费函数构造
调用订阅函数 threadedClient.subscribe
订阅异构流数据表时,在最后一个参数指定相应的流数据反序列化实例 StreamDeserializerSP ,在订阅时会对收到的数据进行反序列化再传递给用户自定义的回调函数 myHandler
,用户可以在回调函数内自定义消费,本文仅实现了简单的输出消费,当消息 msg 的标识为 end 时,通过 threadedClient.unsubscribe
取消订阅,代码如下:
long sumcount = 0;
long long starttime = Util::getNanoEpochTime();
auto myHandler = [&](vector<Message> msgs)
{
for (auto& msg : msgs)
{
std::cout << msg.getSymbol() << " : " << msg->getString() << endl;
if(msg.getSymbol() == "end")
{
threadedClient.unsubscribe(hostName, port, tableNameUuid,"stkReplay");
}
sumcount += msg->get(0)->size();
}
long long speed = (Util::getNanoEpochTime() - starttime) / sumcount;
std::cout << "callback speed: " << speed << "ns" << endl;
};
6.5.1.4. 程序执行
将如上 C++ 程序代码写入 main.cpp 中,在 DolphinDB C++ API 环境下编译成可执行文件 main ,按照如下命令格式输入,回车,即可开始“回放-订阅-消费”过程。
单支股票最大速度回放 “order“ 表一天数据命令:
$ ./main 000616.SZ 20211201 20211201 -1 order
两支(多支)股票最大速度回放三张表一天数据命令:
$ ./main 000616.SZ,000681.SZ 20211201 20211201 -1 snapshot,order,transaction
6.5.1.5. 消费输出
根据前文构造的消费函数,两支股票(“000616.SZ” &“000681.SZ”)最大速度回放三张表一天数据输出如下图所示:
6.5.2. Python API
6.5.2.1. 反序列化器构造
为使异构流数据表反序列化输出,Python API 通过 streamDeserializer 类来构造异构流数据表反序列化器,同时定义输出表结构,代码如下:
sd = ddb.streamDeserializer({
'snapshot': ["dfs://Test_snapshot", "snapshot"],
'order': ["dfs://Test_order", "order"],
'transaction': ["dfs://Test_transaction", "transaction"],
'end': ["dfs://End", "endline"],
}, s)
上述主体部分分别是快照、逐笔委托、逐笔成交、结束信号的表结构构造,回放根据需要选择对应的表结构,结束信号表必须有。
6.5.2.2. Python API 订阅回放服务
在 DolphinDB Python API 中订阅异构流数据表,代码如下:
s.enableStreaming(0)
s.subscribe(host=hostname, port=portname, handler=myHandler, tableName="replay_"+uuidStr, actionName="replay_stock_data", offset=0, resub=False, msgAsTable=False, streamDeserializer=sd, userName="admin", password="123456")
event.wait()
enableStreaming
函数启用流数据功能,函数参数指定开启数据订阅的端口。s 使用 subscribe
函数来订阅 DolphinDB 中的流数据表。订阅是异步执行的,event.wait()
保持主线程不退出。
6.5.2.3. 消费函数构造
调用订阅函数 s.subscribe
订阅异构流数据表时,在参数指定相应的流数据反序列化实例 streamDeserializer
,在订阅时会对收到的数据进行反序列化再传递给用户自定义的回调函数 myHandler
,用户可以在回调函数内自定义消费,仅输出消费代码如下:
def myHandler(lst):
if lst[-1] == "snapshot":
print("SNAPSHOT: ", lst)
elif lst[-1] == 'order':
print("ORDER: ", lst)
elif lst[-1] == 'transaction':
print("TRANSACTION: ", lst)
else:
print("END: ", lst)
event.set()
6.5.2.4. 消费输出
根据前文构造的消费函数,两支股票(“000616.SZ” &“000681.SZ”)最大速度回放三张表一天数据输出如下图所示:
6.6. 性能测试
测试的交易日范围从 2021 年 12 月 1 日至 12 月 9 日共 7 个连续交易日。测试脚本见附录:C++ 测试源码。
6.7. 测试服务器配置
CPU 类型 | Intel(R) Xeon(R) Silver 4216 CPU @ 2.10GHz |
---|---|
逻辑 CPU 总数 | 64 |
内存 | 503 GB |
硬盘 | SSD |
OS | CentOS Linux release 7.9.2009 (Core) |
DolphinDB Server 版本 | 2.00.9.3 2023.03.29 |
6.7.1. 50 支深交所股票一天全速并发回放性能测试
测试方法:选取深交所 2021 年 12 月 1 日 50 支交易股票数据,linux 后台同时启动多个第四章 C++ API 提交回放程序,提交多个并发回放任务。
- 回放耗时=所有回放任务最晚结束时间-所有回放任务最早收到时间
- 回放速率=所有用户回放数据量总和/回放耗时
- 平均单个用户回放耗时=(所有回放任务结束时间和-所有回放任务开始时间和)/并发数量
- 平均单个用户回放速率=单个用户回放数据量/平均单个用户回放耗时
并发数量 | 数据源 | 数据量 | 回放耗时 | 回放速率 | 单个用户回放耗时(平均) | 单个用户回放速率(平均) |
---|---|---|---|---|---|---|
1 | snapshottransactionorder | 2,775,129行 | 约2.669s | 约104w/s | 约2.669s | 约104w/s |
10 | snapshottransactionorder | 27,751,290行 | 约10.161s | 约273w/s | 约8.482s | 约32w/s |
20 | snapshottransactionorder | 55,502,580行 | 约18.761s | 约296w/s | 约16.728s | 约16w/s |
40 | snapshottransactionorder | 111,005,160行 | 约35.537s | 约312w/s | 约19.091s | 约14w/s |
1 | snapshottransaction | 1,416,548行 | 约2.003s | 约70w/s | 约2.003s | 约70w/s |
10 | snapshottransaction | 14,165,480行 | 约7.155s | 约198w/s | 约6.382s | 约22w/s |
20 | snapshottransaction | 28,330,960行 | 约13.115s | 约216w/s | 约11.436s | 约12w/s |
40 | snapshottransaction | 56,661,920行 | 约27.003s | 约210w/s | 约13.740s | 约10w/s |
1 | snapshot | 194,045行 | 约1.387s | 约14w/s | 约1.387s | 约14w/s |
10 | snapshot | 1,940,450行 | 约6.428s | 约30w/s | 约5.128s | 约4w/s |
20 | snapshot | 3,880,900行 | 约11.782s | 约33w/s | 约10.539s | 约2w/s |
40 | snapshot | 7,761,800行 | 约23.274s | 约33w/s | 约12.393s | 约1w/s |
6.7.2. 50 支深交所股票跨天全速回放性能测试
测试方法:选取深交所 50 支交易股票数据,linux 后台启动第四章 C++ API 提交回放程序,提交多天回放任务。
- 回放耗时=回放任务结束时间-回放任务收到时间
- 回放速率=回放数据量/回放耗时
数据源 | 数据量 | 回放耗时 | 回放速率 |
---|---|---|---|
snapshottransactionorder | 一天:2,775,129行 | 约2.925s | 约95w/s |
snapshottransactionorder | 一周:17,366,642行 | 约16.393s | 约106w/s |
snapshottransaction | 一天:1,416,548行 | 约1.912s | 约74w/s |
snapshottransaction | 一周:8,899,562行 | 约9.142s | 约97w/s |
snapshot | 一天:194,045行 | 约1.267s | 约15w/s |
snapshot | 一周:1,279,869行 | 约6.659s | 约19w/s |
6.8. 开发环境配置
6.8.1. 部署 DolphinDB Server
- Server 版本:2.00.9.3 2023.03.29
- Server 部署模式:单服务器集群
- 配置文件:cluster.cfg
maxMemSize=128
maxConnections=5000
workerNum=24
webWorkerNum=2
chunkCacheEngineMemSize=16
newValuePartitionPolicy=add
logLevel=INFO
maxLogSize=512
node1.volumes=/ssd/ssd3/pocTSDB/volumes/node1,/ssd/ssd4/pocTSDB/volumes/node1
node2.volumes=/ssd/ssd5/pocTSDB/volumes/node2,/ssd/ssd6/pocTSDB/volumes/node2
node3.volumes=/ssd/ssd7/pocTSDB/volumes/node3,/ssd/ssd8/pocTSDB/volumes/node3
diskIOConcurrencyLevel=0
node1.redoLogDir=/ssd/ssd3/pocTSDB/redoLog/node1
node2.redoLogDir=/ssd/ssd4/pocTSDB/redoLog/node2
node3.redoLogDir=/ssd/ssd5/pocTSDB/redoLog/node3
node1.chunkMetaDir=/ssd/ssd3/pocTSDB/metaDir/chunkMeta/node1
node2.chunkMetaDir=/ssd/ssd4/pocTSDB/metaDir/chunkMeta/node2
node3.chunkMetaDir=/ssd/ssd5/pocTSDB/metaDir/chunkMeta/node3
node1.persistenceDir=/ssd/ssd6/pocTSDB/persistenceDir/node1
node2.persistenceDir=/ssd/ssd7/pocTSDB/persistenceDir/node2
node3.persistenceDir=/ssd/ssd8/pocTSDB/persistenceDir/node3
maxPubConnections=128
subExecutors=24
subThrottle=1
persistenceWorkerNum=1
node1.subPort=8825
node2.subPort=8826
node3.subPort=8827
maxPartitionNumPerQuery=200000
streamingHAMode=raft
streamingRaftGroups=2:node1:node2:node3
node1.streamingHADir=/ssd/ssd6/pocTSDB/streamingHADir/node1
node2.streamingHADir=/ssd/ssd7/pocTSDB/streamingHADir/node2
node3.streamingHADir=/ssd/ssd8/pocTSDB/streamingHADir/node3
TSDBCacheEngineSize=16
TSDBCacheEngineCompression=false
node1.TSDBRedoLogDir=/ssd/ssd3/pocTSDB/TSDBRedoLogDir/node1
node2.TSDBRedoLogDir=/ssd/ssd4/pocTSDB/TSDBRedoLogDir/node2
node3.TSDBRedoLogDir=/ssd/ssd5/pocTSDB/TSDBRedoLogDir/node3
TSDBLevelFileIndexCacheSize=20
TSDBLevelFileIndexCacheInvalidPercent=0.6
lanCluster=0
enableChunkGranularityConfig=true
注意: 路径配置参数需要开发人员根据实际环境配置。
有关单服务器集群部署教程,请参考:单服务器集群部署。
6.8.2. DolphinDB client 开发环境
- CPU 类型:Intel(R) Core(TM) i5-11500 @ 2.70GHz 2.71 GHz
- 逻辑 CPU 总数:12
- 内存:16GB
- OS:Windows 11 专业版
- DolphinDB GUI 版本:1.30.20.1
6.8.3. DolphinDB C++ API 安装
- C++ API 版本:release200.9
注意: C++ API 版本建议按 DolphinDB server 版本选择,如 2.00.9 版本的 server 安装 release200.9 分支的 API
C++ API 教程:C++ API 教程
6.8.4. DolphinDB Python API 安装
- Python API 版本:1.30.21.1
Python API 教程:Python API 教程
6.9. 路线图 (Roadmap)
- 在后续版本中,
replay
函数将支持原速回放,即严格以两条记录的时间戳之差为输出间隔向下游注入数据; - 在后续版本中,
replay
函数将支持原速倍速回放,即严格以两条记录的时间戳之差为输出间隔进行倍速再向下游注入数据。