低延时实时计算 API 使用指南

在订单簿构建、分钟频行情因子计算、逐笔行情因子分析等场景中,数据的实时性、准确性和处理效率直接决定了策略的执行效果与风险控制能力。交易所逐笔行情数据具有高频、高并发、大容量的特征,单条数据的延迟可能引发整体计算链路的阻塞,导致订单簿更新滞后、因子计算失准,甚至触发误判的风控信号。例如:

  • 在订单簿构建中,若买卖盘深度无法实时同步,市场情绪分析将滞后,可能错失交易机会。

  • 在持仓监控中,延迟的实时计算可能掩盖交易异常行为,增加系统性风险。

为应对这些挑战,Swordfish 提供低延时实时计算架构,通过单条数据即时处理分布式计算优化,实现高精度、低延迟数据处理。通过设置 lowLatency 参数开启低延时计算模式,从而优化数据处理延迟和计算效率。

低延时计算模式主要适用于以下三类流计算引擎:createOrderBookSnapshotEnginecreateReactiveStateEnginecreateTimeSeriesEngine。本文将重点演示如何使用这三类引擎开启低延时模式,并通过示例展示逐条写入数据的方法,帮助用户快速掌握低延时实时计算能力。

订单簿快照引擎低延时模式

通过指定 lowLatency=true,可以开启订单簿快照引擎的低延时计算模式。更多函数说明,请参考 createOrderBookSnapshotEngine

在低延时模式下,需要注意以下几点:

  • 逐条写入:数据必须一条一条发送到引擎,不能批量写入,以保证最低延迟。
  • 用户定义指标(metrics)优化userDefinedMetrics 传入的表达式会进行类型特化与表达式优化,但只有符合条件的运算符才能被优化。具体要求请参考支持优化的运算符
  • 写入数据结构:要求按照市场数据结构写入数据,以减少开销。Swordfish 提供了 MarketDataRow 类,用于封装交易所逐笔行情的核心字段。用户可以通过创建 MarketDataRow 类对象,并逐条写入引擎来满足低延时模式的数据要求。MarketDataRow 类定义如下:
    class MarketDataRow : public Constant {
    public:
        MarketDataRow() : Constant() {
        }
        MarketDataRow(std::string &code, int type, int side, int time, int msgType, int64_t price, int64_t qty,
                      int64_t seq, int64_t buyOrder, int64_t sellOrder, int64_t receiveTime)
            : Constant(), code(code), type(type), side(side), time(time), msgType(msgType),
              price(price), qty(qty), seq(seq), buyOrder(buyOrder), sellOrder(sellOrder), receiveTime(receiveTime)
               {}
        virtual ~MarketDataRow(){};
        virtual DATA_TYPE getRawType() const {return DT_DICTIONARY;}
        virtual ConstantSP getInstance() const {return getValue();}
        virtual ConstantSP getValue() const {
            return ConstantSP(new MarketDataRow(*this));
        }
    
        DolphinString code;
        int type;
        int side;
        int time;
        int msgType;
        int64_t price;
        int64_t qty;
        int64_t seq;
        int64_t buyOrder;
        int64_t sellOrder;
        int64_t receiveTime; // When engine receives the message, in nanoseconds.
    };

使用示例

以下示例演示如何从 CSV 文件读取逐条行情数据,并构造 MarketDataRow 对象逐条写入订单簿快照引擎。完整 C++ 代码及 CSV 文件见附件。
while(csv_in.read_row(data.security_id, data.md_date, data.md_time, data.security_id_source, data.security_type, data.index, data.source_type, data.type, data.price, data.qty, data.bs_flag, data.buy_no, data.sell_no, data.appl_seq_num, data.channel_no)) {
    cnt++;
    lastTimeStamp = Util::toLocalNanoTimestamp(std::chrono::system_clock::now().time_since_epoch() /std::chrono::nanoseconds(1));
    ConstantSP row(new MarketDataRow(
            data.security_id, data.type, data.bs_flag,
            convertStringToTime(data.md_time)->getInt(),
            data.source_type, data.price, data.qty, data.appl_seq_num,
            data.buy_no, data.sell_no,
            lastTimeStamp)
    );
    vector<ConstantSP> columns {row};
    ((MarketDataRow*)columns[0].get())->receiveTime = Util::toLocalNanoTimestamp(std::chrono::system_clock::now().time_since_epoch() /std::chrono::nanoseconds(1));
    engine->append(columns, insertedRows, appendErrMsg);
}
Note:

当订单簿引擎的下游为其它低延时计算引擎时,triggerType 建议设置为 "perRow",使得订单簿引擎每收到一行数据就计算输出一次,以符合单条写入下游计算引擎的要求。

响应式状态引擎低延时模式

和订单簿引擎类似,响应式状态引擎也可以指定 lowLatency=true,开启低延时计算模式。更多函数说明,请参考 https://docs.dolphindb.cn/zh/funcs/c/createReactiveStateEngine.html

在低延时模式下,响应式状态引擎需要注意以下几点:

  • 逐条写入:数据必须一条一条发送到引擎,不能批量写入,以保证最低延迟。
  • 用户定义指标(metrics)优化metrics 参数传入的表达式会进行类型特化与表达式优化,但只有符合条件的运算符才能被优化。具体要求请参考支持优化的运算符

使用示例

VectorSP col1 = Util::createVector(DT_STRING, 1, 1);
VectorSP col2 = Util::createVector(DT_DATE, 1, 1);
VectorSP col3 = Util::createVector(DT_TIME, 1, 1);
VectorSP col4 = Util::createVector(DT_STRING, 1, 1);
VectorSP col5 = Util::createVector(DT_STRING, 1, 1);
VectorSP col6 = Util::createVector(DT_LONG, 1, 1);
VectorSP col7 = Util::createVector(DT_INT, 1, 1);
VectorSP col8 = Util::createVector(DT_INT, 1, 1);
VectorSP col9 = Util::createVector(DT_LONG, 1, 1);
VectorSP col10 = Util::createVector(DT_LONG, 1, 1);
VectorSP col11 = Util::createVector(DT_INT, 1, 1);
VectorSP col12 = Util::createVector(DT_LONG, 1, 1);
VectorSP col13 = Util::createVector(DT_LONG, 1, 1);
VectorSP col14 = Util::createVector(DT_LONG, 1, 1);
VectorSP col15 = Util::createVector(DT_INT, 1, 1);

while(csv_in.read_row(data.security_id, data.md_date, data.md_time, data.security_id_source, data.security_type, data.index, data.source_type, data.type, data.price, data.qty, data.bs_flag, data.buy_no, data.sell_no, data.appl_seq_num, data.channel_no)) {
    col1->setString(0, data.security_id);
    col2->set(0, convertStringToDate(data.md_date));
    col3->set(0, convertStringToTime(data.md_time));
    col4->setString(0, data.security_id_source);
    col5->setString(0, data.security_type);
    col6->setLong(0, data.index);
    col7->setInt(0, data.source_type);
    col8->setInt(0, data.type);
    col9->setLong(0, data.price);
    col10->setLong(0, data.qty);
    col11->setInt(0, data.bs_flag);
    col12->setLong(0, (data.buy_no));
    col13->setLong(0, (data.sell_no));
    col14->setLong(0, (data.appl_seq_num));
    col15->setInt(0, (data.channel_no));        
    vector<ConstantSP> columns {col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13,col14,col15};
    engine->append(columns, insertedRows, appendErrMsg);
}

时间序列引擎低延时模式

通过指定 lowLatency=true,可以开启时间序列引擎的低延时计算模式。更多函数说明,请参考

https://docs.dolphindb.cn/zh/funcs/c/createTimeSeriesEngine.html。数据必须一条一条写入引擎,示例如下:

VectorSP col1 = Util::createVector(DT_STRING, 1, 1);
VectorSP col2 = Util::createVector(DT_DATE, 1, 1);
VectorSP col3 = Util::createVector(DT_TIME, 1, 1);
VectorSP col4 = Util::createVector(DT_STRING, 1, 1);
VectorSP col5 = Util::createVector(DT_STRING, 1, 1);
VectorSP col6 = Util::createVector(DT_LONG, 1, 1);
VectorSP col7 = Util::createVector(DT_INT, 1, 1);
VectorSP col8 = Util::createVector(DT_INT, 1, 1);
VectorSP col9 = Util::createVector(DT_LONG, 1, 1);
VectorSP col10 = Util::createVector(DT_LONG, 1, 1);
VectorSP col11 = Util::createVector(DT_INT, 1, 1);
VectorSP col12 = Util::createVector(DT_LONG, 1, 1);
VectorSP col13 = Util::createVector(DT_LONG, 1, 1);
VectorSP col14 = Util::createVector(DT_LONG, 1, 1);
VectorSP col15 = Util::createVector(DT_INT, 1, 1);

while(csv_in.read_row(data.security_id, data.md_date, data.md_time, data.security_id_source, data.security_type, data.index, data.source_type, data.type, data.price, data.qty, data.bs_flag, data.buy_no, data.sell_no, data.appl_seq_num, data.channel_no)) {
    col1->setString(0, data.security_id);
    col2->set(0, convertStringToDate(data.md_date));
    col3->set(0, convertStringToTime(data.md_time));
    col4->setString(0, data.security_id_source);
    col5->setString(0, data.security_type);
    col6->setLong(0, data.index);
    col7->setInt(0, data.source_type);
    col8->setInt(0, data.type);
    col9->setLong(0, data.price);
    col10->setLong(0, data.qty);
    col11->setInt(0, data.bs_flag);
    col12->setLong(0, (data.buy_no));
    col13->setLong(0, (data.sell_no));
    col14->setLong(0, (data.appl_seq_num));
    col15->setInt(0, (data.channel_no));        
    vector<ConstantSP> columns {col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13,col14,col15};
    engine->append(columns, insertedRows, appendErrMsg);
}

支持优化的运算符

用户自定义指标(userDefinedMetricsmetrics)仅支持下述的二元运算符,且运算数必须为符合下述类型的标量或向量,否则会在创建引擎时将会报错。

标量常量支持的运算符

类别 运算符 支持类型
算术运算 加法 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE, DECIMAL64, DECIMAL128
减法 CHAR, SHORT, INT, LONG, FLOAT, DOUBLE, DECIMAL64, DECIMAL128
乘法 CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
除法 CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
比例 CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
取模 CHAR, SHORT, INT, LONG
幂运算 CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
逻辑运算 逻辑与 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
逻辑或 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
逻辑异或 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
比较运算 等于 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
不等于 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
大于 CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
大于等于 CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
小于 CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
小于等于 CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
区间判断 CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
一元运算 逻辑非 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
取负 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
绝对值 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
平方根 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
平方 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
立方根 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
倒数 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
符号位 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
符号函数 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
三角函数与对数函数 反双曲正切 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
自然对数 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
ln(1+x) BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
以2为底对数 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
常用对数 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
指数函数 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
2的幂 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
e^x - 1 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
正弦 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
反正弦 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
双曲正弦 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
反双曲正弦 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
余弦 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
双曲余弦 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
反余弦 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
反双曲余弦 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
正切 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
双曲正切 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
反正切 BOOL, CHAR, SHORT, INT, LONG, FLOAT, DOUBLE

向量常量支持的运算符

类别 运算符 支持类型
加法运算 加法 CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
减法运算 减法 CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
乘法运算 乘法 CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
除法运算 除法 CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
比例运算 比例 CHAR, SHORT, INT, LONG, FLOAT, DOUBLE
其他操作 元素访问(at),按索引访问向量元素,左操作数为向量,右操作数为单个标量索引

性能测试

为验证 Swordfish 低延时计算模式的实际性能,我们进行了引擎级联的端到端延迟测试(OrderBookSnapshotEngine → TimeSeriesEngine → ReactiveStateEngine)。所有引擎均开启低延时计算模式,输入单只股票数据。

测试环境

  • 机器:Intel(R) Core(TM) i9-14900KS
  • CPU 最大频率:5800 MHz
场景 单位(微秒)
最小值 2.876
最大值 36.927
平均值 4.56676
中位数 3.121
P90 6.417
P99 36.927
P999 36.927
P9999 36.927