低延时实时计算 API 使用指南
在订单簿构建、分钟频行情因子计算、逐笔行情因子分析等场景中,数据的实时性、准确性和处理效率直接决定了策略的执行效果与风险控制能力。交易所逐笔行情数据具有高频、高并发、大容量的特征,单条数据的延迟可能引发整体计算链路的阻塞,导致订单簿更新滞后、因子计算失准,甚至触发误判的风控信号。例如:
-
在订单簿构建中,若买卖盘深度无法实时同步,市场情绪分析将滞后,可能错失交易机会。
-
在持仓监控中,延迟的实时计算可能掩盖交易异常行为,增加系统性风险。
为应对这些挑战,Swordfish 提供低延时实时计算架构,通过单条数据即时处理与分布式计算优化,实现高精度、低延迟数据处理。通过设置 lowLatency 参数开启低延时计算模式,从而优化数据处理延迟和计算效率。
低延时计算模式主要适用于以下三类流计算引擎:createOrderBookSnapshotEngine
、createReactiveStateEngine
和
createTimeSeriesEngine
。本文将重点演示如何使用这三类引擎开启低延时模式,并通过示例展示逐条写入数据的方法,帮助用户快速掌握低延时实时计算能力。
订单簿快照引擎低延时模式
通过指定 lowLatency=true,可以开启订单簿快照引擎的低延时计算模式。更多函数说明,请参考 createOrderBookSnapshotEngine。
在低延时模式下,需要注意以下几点:
- 逐条写入:数据必须一条一条发送到引擎,不能批量写入,以保证最低延迟。
- 用户定义指标(metrics)优化:userDefinedMetrics 传入的表达式会进行类型特化与表达式优化,但只有符合条件的运算符才能被优化。具体要求请参考支持优化的运算符。
- 写入数据结构:要求按照市场数据结构写入数据,以减少开销。Swordfish 提供了
MarketDataRow
类,用于封装交易所逐笔行情的核心字段。用户可以通过创建MarketDataRow
类对象,并逐条写入引擎来满足低延时模式的数据要求。MarketDataRow
类定义如下:class MarketDataRow : public Constant { public: MarketDataRow() : Constant() { } MarketDataRow(std::string &code, int type, int side, int time, int msgType, int64_t price, int64_t qty, int64_t seq, int64_t buyOrder, int64_t sellOrder, int64_t receiveTime) : Constant(), code(code), type(type), side(side), time(time), msgType(msgType), price(price), qty(qty), seq(seq), buyOrder(buyOrder), sellOrder(sellOrder), receiveTime(receiveTime) {} virtual ~MarketDataRow(){}; virtual DATA_TYPE getRawType() const {return DT_DICTIONARY;} virtual ConstantSP getInstance() const {return getValue();} virtual ConstantSP getValue() const { return ConstantSP(new MarketDataRow(*this)); } DolphinString code; int type; int side; int time; int msgType; int64_t price; int64_t qty; int64_t seq; int64_t buyOrder; int64_t sellOrder; int64_t receiveTime; // When engine receives the message, in nanoseconds. };
使用示例
MarketDataRow
对象逐条写入订单簿快照引擎。完整 C++
代码及 CSV
文件见附件。while(csv_in.read_row(data.security_id, data.md_date, data.md_time, data.security_id_source, data.security_type, data.index, data.source_type, data.type, data.price, data.qty, data.bs_flag, data.buy_no, data.sell_no, data.appl_seq_num, data.channel_no)) {
cnt++;
lastTimeStamp = Util::toLocalNanoTimestamp(std::chrono::system_clock::now().time_since_epoch() /std::chrono::nanoseconds(1));
ConstantSP row(new MarketDataRow(
data.security_id, data.type, data.bs_flag,
convertStringToTime(data.md_time)->getInt(),
data.source_type, data.price, data.qty, data.appl_seq_num,
data.buy_no, data.sell_no,
lastTimeStamp)
);
vector<ConstantSP> columns {row};
((MarketDataRow*)columns[0].get())->receiveTime = Util::toLocalNanoTimestamp(std::chrono::system_clock::now().time_since_epoch() /std::chrono::nanoseconds(1));
engine->append(columns, insertedRows, appendErrMsg);
}
当订单簿引擎的下游为其它低延时计算引擎时,triggerType 建议设置为 "perRow",使得订单簿引擎每收到一行数据就计算输出一次,以符合单条写入下游计算引擎的要求。
响应式状态引擎低延时模式
和订单簿引擎类似,响应式状态引擎也可以指定 lowLatency=true,开启低延时计算模式。更多函数说明,请参考 https://docs.dolphindb.cn/zh/funcs/c/createReactiveStateEngine.html。
在低延时模式下,响应式状态引擎需要注意以下几点:
- 逐条写入:数据必须一条一条发送到引擎,不能批量写入,以保证最低延迟。
- 用户定义指标(metrics)优化:metrics 参数传入的表达式会进行类型特化与表达式优化,但只有符合条件的运算符才能被优化。具体要求请参考支持优化的运算符。
使用示例
VectorSP col1 = Util::createVector(DT_STRING, 1, 1);
VectorSP col2 = Util::createVector(DT_DATE, 1, 1);
VectorSP col3 = Util::createVector(DT_TIME, 1, 1);
VectorSP col4 = Util::createVector(DT_STRING, 1, 1);
VectorSP col5 = Util::createVector(DT_STRING, 1, 1);
VectorSP col6 = Util::createVector(DT_LONG, 1, 1);
VectorSP col7 = Util::createVector(DT_INT, 1, 1);
VectorSP col8 = Util::createVector(DT_INT, 1, 1);
VectorSP col9 = Util::createVector(DT_LONG, 1, 1);
VectorSP col10 = Util::createVector(DT_LONG, 1, 1);
VectorSP col11 = Util::createVector(DT_INT, 1, 1);
VectorSP col12 = Util::createVector(DT_LONG, 1, 1);
VectorSP col13 = Util::createVector(DT_LONG, 1, 1);
VectorSP col14 = Util::createVector(DT_LONG, 1, 1);
VectorSP col15 = Util::createVector(DT_INT, 1, 1);
while(csv_in.read_row(data.security_id, data.md_date, data.md_time, data.security_id_source, data.security_type, data.index, data.source_type, data.type, data.price, data.qty, data.bs_flag, data.buy_no, data.sell_no, data.appl_seq_num, data.channel_no)) {
col1->setString(0, data.security_id);
col2->set(0, convertStringToDate(data.md_date));
col3->set(0, convertStringToTime(data.md_time));
col4->setString(0, data.security_id_source);
col5->setString(0, data.security_type);
col6->setLong(0, data.index);
col7->setInt(0, data.source_type);
col8->setInt(0, data.type);
col9->setLong(0, data.price);
col10->setLong(0, data.qty);
col11->setInt(0, data.bs_flag);
col12->setLong(0, (data.buy_no));
col13->setLong(0, (data.sell_no));
col14->setLong(0, (data.appl_seq_num));
col15->setInt(0, (data.channel_no));
vector<ConstantSP> columns {col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13,col14,col15};
engine->append(columns, insertedRows, appendErrMsg);
}
时间序列引擎低延时模式
通过指定 lowLatency=true,可以开启时间序列引擎的低延时计算模式。更多函数说明,请参考
VectorSP col1 = Util::createVector(DT_STRING, 1, 1);
VectorSP col2 = Util::createVector(DT_DATE, 1, 1);
VectorSP col3 = Util::createVector(DT_TIME, 1, 1);
VectorSP col4 = Util::createVector(DT_STRING, 1, 1);
VectorSP col5 = Util::createVector(DT_STRING, 1, 1);
VectorSP col6 = Util::createVector(DT_LONG, 1, 1);
VectorSP col7 = Util::createVector(DT_INT, 1, 1);
VectorSP col8 = Util::createVector(DT_INT, 1, 1);
VectorSP col9 = Util::createVector(DT_LONG, 1, 1);
VectorSP col10 = Util::createVector(DT_LONG, 1, 1);
VectorSP col11 = Util::createVector(DT_INT, 1, 1);
VectorSP col12 = Util::createVector(DT_LONG, 1, 1);
VectorSP col13 = Util::createVector(DT_LONG, 1, 1);
VectorSP col14 = Util::createVector(DT_LONG, 1, 1);
VectorSP col15 = Util::createVector(DT_INT, 1, 1);
while(csv_in.read_row(data.security_id, data.md_date, data.md_time, data.security_id_source, data.security_type, data.index, data.source_type, data.type, data.price, data.qty, data.bs_flag, data.buy_no, data.sell_no, data.appl_seq_num, data.channel_no)) {
col1->setString(0, data.security_id);
col2->set(0, convertStringToDate(data.md_date));
col3->set(0, convertStringToTime(data.md_time));
col4->setString(0, data.security_id_source);
col5->setString(0, data.security_type);
col6->setLong(0, data.index);
col7->setInt(0, data.source_type);
col8->setInt(0, data.type);
col9->setLong(0, data.price);
col10->setLong(0, data.qty);
col11->setInt(0, data.bs_flag);
col12->setLong(0, (data.buy_no));
col13->setLong(0, (data.sell_no));
col14->setLong(0, (data.appl_seq_num));
col15->setInt(0, (data.channel_no));
vector<ConstantSP> columns {col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13,col14,col15};
engine->append(columns, insertedRows, appendErrMsg);
}
支持优化的运算符
用户自定义指标(userDefinedMetrics 或 metrics)仅支持下述的二元运算符,且运算数必须为符合下述类型的标量或向量,否则会在创建引擎时将会报错。
标量常量支持的运算符
类别 | 运算符 | 支持类型 |
---|---|---|
算术运算 | 加法 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE, DECIMAL64, DECIMAL128 |
减法 | CHAR, SHORT, INT, LONG, FLOAT, DOUBLE, DECIMAL64, DECIMAL128 | |
乘法 | CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
除法 | CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
比例 | CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
取模 | CHAR, SHORT, INT, LONG | |
幂运算 | CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
逻辑运算 | 逻辑与 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE |
逻辑或 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
逻辑异或 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
比较运算 | 等于 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE |
不等于 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
大于 | CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
大于等于 | CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
小于 | CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
小于等于 | CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
区间判断 | CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
一元运算 | 逻辑非 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE |
取负 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
绝对值 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
平方根 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
平方 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
立方根 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
倒数 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
符号位 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
符号函数 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
三角函数与对数函数 | 反双曲正切 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE |
自然对数 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
ln(1+x) | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
以2为底对数 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
常用对数 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
指数函数 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
2的幂 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
e^x - 1 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
正弦 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
反正弦 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
双曲正弦 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
反双曲正弦 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
余弦 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
双曲余弦 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
反余弦 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
反双曲余弦 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
正切 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
双曲正切 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE | |
反正切 | BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE |
向量常量支持的运算符
类别 | 运算符 | 支持类型 |
---|---|---|
加法运算 | 加法 | CHAR, SHORT, INT, LONG, FLOAT, DOUBLE |
减法运算 | 减法 | CHAR, SHORT, INT, LONG, FLOAT, DOUBLE |
乘法运算 | 乘法 | CHAR, SHORT, INT, LONG, FLOAT, DOUBLE |
除法运算 | 除法 | CHAR, SHORT, INT, LONG, FLOAT, DOUBLE |
比例运算 | 比例 | CHAR, SHORT, INT, LONG, FLOAT, DOUBLE |
其他操作 | 元素访问(at),按索引访问向量元素,左操作数为向量,右操作数为单个标量索引 |
性能测试
为验证 Swordfish 低延时计算模式的实际性能,我们进行了引擎级联的端到端延迟测试(OrderBookSnapshotEngine → TimeSeriesEngine → ReactiveStateEngine)。所有引擎均开启低延时计算模式,输入单只股票数据。
测试环境:
- 机器:Intel(R) Core(TM) i9-14900KS
- CPU 最大频率:5800 MHz
场景 | 单位(微秒) |
---|---|
最小值 | 2.876 |
最大值 | 36.927 |
平均值 | 4.56676 |
中位数 | 3.121 |
P90 | 6.417 |
P99 | 36.927 |
P999 | 36.927 |
P9999 | 36.927 |