状态函数计算库

在头文件 swordfish.h 中,声明了名为 ReactiveStateFactory 的结构体,包含了多个静态函数,用于实现各种可用于响应式状态引擎的状态算子。状态算子用于有状态计算,意味着在计算过程中,其输出不仅取决于当前输入,还受到先前状态或历史记录的影响。状态算子在处理当前输入后,会更新其内部状态,并将该状态用于后续计算。

状态函数特性及其应用

状态函数计算库是 Swordfish 的重要组成部分,具有以下特性:

  • 增量计算:状态函数能够根据当前输入和之前的状态进行增量计算,避免了重复计算,提高了计算性能。

  • 分组计算:支持对数据进行分组计算,对分组应用计算规则。

  • 多种算子:支持多种类型的算子,包括:TA-Lib 函数、累计窗口函数、行计算函数、序列函数、topN 函数、高阶函数。

基于以上特性,响应式状态函数适用于对单条数据即时响应并实时更新输出计算结果的高频计算场景,例如:

  • 金融:实时计算高频因子,如基于快照数据计算股票的短时涨幅等。

  • 物联网:检测传感器状态的变化,如实时监控温度、湿度、压力等数据指标的波动。

状态函数使用

函数说明

因为 ReactiveStateFactory 中状态函数的参数形式一致,这里仅以 createMsumReactiveState 为例进行说明,其函数声明如下:

createMsumReactiveState(const vector<ObjectSP>& args, 
const vector<int>& inputColIndices, const vector<DATA_TYPE>& inputColTypes, 
const vector<int>& outputColIndices)

该函数实现了 msum 的功能,用于处理 TA-Lib 计算中空值。其参数列表如下:

args:传入的参数列表。这里需要按照 msum 函数的要求传入参数。

inputColIndices:计算输入列的索引。

inputColTypes:计算输入列的类型。

outputColIndices:输出列的索引。

用户在代码中须通过 #include 指令获取头文件 Swordfish.h,且使用前应通过 DolphinDBLib::initializeRuntime() 初始化运行时,使用后通过 DolphinDBLib::finalizeRuntime() 关闭运行时。

下文将分别举例介绍几种常用引擎的用法。为保持代码简洁,以下的预处理器编译指令和运行时相关代码将不再重复展示,仅展示具体函数的实现,即以下代码 XXX 部分。

#include "Swordfish.h"
int main()
{
    DolphinDBLib::initializeRuntime();
    XXX
    DolphinDBLib::finalizeRuntime();
    return 0;
}

示例

本示例以创建一个状态引擎的 msum 算子为例,介绍状态函数的使用方法。

在代码中必须通过 #include 指令获取头文件 Swordfish.h。在使用状态函数前,必须通过 DolphinDBLib::initializeRuntime() 初始化运行时,在使用状态函数后通过 DolphinDBLib::finalizeRuntime() 关闭运行时。

#include "Swordfish.h"
int main()
{
    DolphinDBLib::initializeRuntime();
    // 代码实现
    DolphinDBLib::finalizeRuntime();
    return 0;
}

下例创建一个 msumReactiveState 算子,用于增量计算给定长度的移动窗口内的元素总和。实现流程如下:

  1. 创建 DolphinDB 会话,以便在会话中执行 DolphinDB 的内置函数。

  2. 创建状态表,用于存储 val 和 msum 的值。

  3. 创建参数数组 args,第一个参数为 val 列的引用,第二个参数为窗口长度 3,用于计算移动求和。

  4. 使用 ReactiveStateFactory::createMsumReactiveState 创建一个移动求和的状态算子 msumReactiveState

  5. 向状态表中插入初始值(DBL_NMIN),表示移动求和的初始状态。

  6. 创建一个空的结果向量 msumResult,用于存储计算结果。

  7. 定义输入数据 valData,后续会对这些数值进行移动求和。

  8. 更新状态表,并调用 msumReactiveState->append() 方法进行增量计算。从状态表中获取当前索引的移动求和结果,并将其添加到结果向量 msumResult 中。

  9. 输出最终的移动求和结果。

#include "Swordfish.h"
int main() {
    DolphinDBLib::initializeRuntime();
    SessionSP session = DolphinDBLib::createSession();
    // Calculate msum(val, 3) context by sym
    vector<string> colNames = {"val", "msum"};
    vector<DATA_TYPE> colTypes = { DT_DOUBLE, DT_DOUBLE};
    TableSP stateTable = Util::createTable(colNames, colTypes, 0, 1);
    vector<ObjectSP> args(2);
    SQLContextSP context = new SQLContext();
    args[0] = new ColumnRef(context, "val");
    // Set window = 3
    args[1] = new Int(3); 
    // Set input column index in stateTable
    vector<int> inputColIndices = {0}; 
    // Set input column data type in stateTable
    vector<DATA_TYPE> inputColTypes = {DT_DOUBLE};
    // Set output column index in stateTable
    vector<int> outputColIndices = {1}; 
    // Create msum reactive state
    ReactiveStateSP msumReactiveState = ReactiveStateFactory::createMsumReactiveState(args, inputColIndices, inputColTypes, outputColIndices);
    msumReactiveState->setTable(stateTable);
    msumReactiveState->setSession(session);
    context->setTable(stateTable);
    INDEX rowInserted = 0;
    string errMsg;
    std::vector<ConstantSP> rowValues = {new Double(DBL_NMIN), new Double(DBL_NMIN)};
    if(!stateTable->append(rowValues, rowInserted, errMsg)){
        throw RuntimeException("Failed to append data to state table with error: " + errMsg);
    }
    msumReactiveState->addKeys(1);
    // Create result VectorSP
    VectorSP msumResult = Util::createVector(DT_DOUBLE, 0);
    // Create input data
    std::vector<double> valData = {1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0};

    INDEX rows = valData.size();
    ConstantSP val = new Double();
    std::vector<ConstantSP> updateValue = {val};
    std::vector<string> updateColName = {"val"};
    VectorSP index = Util::createIndexVector(0,1);
    // Iterate through each element of valCol
    for(int i = 0; i < rows; i++){
        // Get the val element
        val->setDouble(valData[i]);
        if(!stateTable->update(updateValue, index, updateColName, errMsg)){
            throw RuntimeException("Failed to update state table with error: " + errMsg);
        }
        // Incrementally calculate msum
        msumReactiveState->append(session->getHeap().get(), index);
        // Get the result from stateTable for the current index
        ConstantSP res = stateTable->getColumn(1)->get(index);
        msumResult->append(res);
    }

    std::cout<<" msum result"<<std::endl;
    std::cout << msumResult->getString() << std::endl;
    DolphinDBLib::finalizeRuntime();
    return 0;
}

API 参考

本节给出 Swordfish 中的状态函数与 DolphinDB 内置函数的对应表。有关函数功能的详细解释,请参考 DolphinDB。

Swordfish 状态函数 DolphinDB 函数
createTalibNullReactiveState talibNull
createTalibReactiveState talib
createKamaReactiveState kama
createSmaReactiveState sma
createWmaReactiveState wma
createMaReactiveState ma
createDemaReactiveState dema
createTemaReactiveState tema
createTrimaReactiveState trima
createT3ReactiveState t3
createLinearTimeTrendReactiveState LinearTimeTrend
createEmaReactiveState ema
createWilderReactiveState wilder
createGemaReactiveState gema
createMoveReactiveState move
createPrevReactiveState prev
createDeltasReactiveState deltas
createRatiosReactiveState ratios
createPercentChangeReactiveState percentChange
createFfillReactiveState ffill
createIterateReactiveState iterate
createEwmMeanReactiveState ewmMean
createEwmVarReactiveState ewmVar
createEwmStdReactiveState ewmStd
createEwmCovReactiveState ewmCov
createEwmCorrReactiveState ewmCorr
createMcountReactiveState mcount
createMavgReactiveState mavg
createMsumReactiveState msum
createMprodReactiveState mprod
createMvarReactiveState mvar
createMvarpReactiveState mvarp
createMstdReactiveState mstd
createMstdpReactiveState mstdp
createMskewReactiveState mskew
createMkurtosisReactiveState mkurtosis
createMminReactiveState mmin
createMmaxReactiveState mmax
createMiminReactiveState mimin
createMimaxReactiveState mimax
createMfirstReactiveState mfirst
createMlastReactiveState mlast
createMmedReactiveState mmed
createMpercentileReactiveState mpercentile
createMrankReactiveState mrank
createMcorrReactiveState mcorr
createMcovarReactiveState mcovar
createMbetaReactiveState mbeta
createMwsumReactiveState mwsum
createMwavgReactiveState mwavg
createMslrReactiveState mslr
createCumavgReactiveState cumavg
createCumsumReactiveState cumsum
createCumprodReactiveState cumprod
createCumcountReactiveState cumcount
createCumvarReactiveState cumvar
createCumminReactiveState cummin
createCummaxReactiveState cummax
createCumvarpReactiveState cumvarp
createCumstdReactiveState cumstd
createCumstdpReactiveState cumstdp
createCumcorrReactiveState cumcorr
createCumcovarReactiveState cumcovar
createCumbetaReactiveState cumbeta
createCumwsumReactiveState cumwsum
createCumwavgReactiveState cumwavg
createCumfirstNotReactiveState cumfirstNot
createCumlastNotReactiveState cumlastNot
createCummedReactiveState cummed
createCumpercentileReactiveState cumpercentile
createCumPositiveStreakReactiveState cumPositiveStreak
createCumnuniqueReactiveState cumnunique
createMovingReactiveState moving
createWindowReactiveState window
createTMoveReactiveState tmove
createTMovingsumReactiveState tmsum
createTMovingsum2ReactiveState tmsum2
createTMovingminReactiveState tmmin
createTMovingmaxReactiveState tmmax
createTMovingrankReactiveState tmrank
createTMovingmedReactiveState tmmed
createTMovingpercentileReactiveState tmpercentile
createTMovingprodReactiveState tmprod
createTMovingcorrReactiveState tmcorr
createTMovingbetaReactiveState tmbeta
createTMovingcovarReactiveState tmcovar
createTMovingwsumReactiveState tmwsum
createTMovingwavgReactiveState tmwavg
createTMovingskewReactiveState tmskew
createTMovingkurtosisReactiveState tmkurtosis
createTMovingavgReactiveState tmavg
createTMovingcountReactiveState tmcount
createTMovingvarReactiveState tmvar
createTMovingvarpReactiveState tmvarp
createTMovingstdReactiveState tmstd
createTMovingstdpReactiveState tmstdp
createTMovingfirstReactiveState tmfirst
createTMovinglastReactiveState tmlast
createTMovingReactiveState tmoving
createMmadReactiveState mmad
createCumsumTopNReactiveState cumsumTopN
createCumavgTopNReactiveState cumavgTopN
createCumstdTopNReactiveState cumstdTopN
createCumstdpTopNReactiveState cumstdpTopN
createCumvarTopNReactiveState cumvarTopN
createCumvarpTopNReactiveState cumvarpTopN
createCumbetaTopNReactiveState cumbetaTopN
createCumcorrTopNReactiveState cumcorrTopN
createCumcovarTopNReactiveState cumvarTopN
createCumwsumTopNReactiveState cumwsumTopN
createCumskewTopNReactiveState cumskewTopN
createCumkurtosisTopNReactiveState cumkurtosisTopN
createMsumTopNReactiveState cumsumTopN
createMavgTopNReactiveState mavgTopN
createMstdpTopNReactiveState mstdpTopN
createMstdTopNReactiveState mstdTopN
createMvarpTopNReactiveState mvarpTopN
createMvarTopNReactiveState mvarTopN
createMcorrTopNReactiveState mcorrTopN
createMbetaTopNReactiveState mbetaTopN
createMcovarTopNReactiveState mcovarTopN
createMwsumTopNReactiveState mwsumTopN
createMskewTopNReactiveState mskewTopN
createMkurtosisTopNReactiveState mkurtosisTopN
createTMovingsumTopNReactiveState tmsumTopN
createTMovingavgTopNReactiveState tmavgTopN
createTMovingstdpTopNReactiveState tmstdpTopN
createTMovingstdTopNReactiveState tmstdTopN
createTMovingvarpTopNReactiveState tmvarpTopN
createTMovingvarTopNReactiveState tmvarTopN
createTMovingskewTopNReactiveState tmskewTopN
createTMovingkurtosisTopNReactiveState tmkurtosisTopN
createTMovingcorrTopNReactiveState tmcorrTopN
createTMovingbetaTopNReactiveState tmbetaTopN
createTMovingcovarTopNReactiveState tmcovarTopN
createTMovingwsumTopNReactiveState tmwsumTopN
createDynamicGroupCumcountReactiveState dynamicGroupCumcount
createDynamicGroupCumsumReactiveState dynamicGroupCumsum
createSegmentbyReactiveState segmentby
createPrevStateReactiveState prevState
createTopRangeReactiveState topRange
createLowRangeReactiveState lowRange
createMmaxPositiveStreakReactiveState mmaxPositiveStreak
createSumbarsReactiveState sumbars
createTrueRangeReactiveState trueRange
createConditionalIterateReactiveState conditionalIterate
createStateIterateReactiveState stateIterate
createMovingWindowDataReactiveState moving
createTMovingWindowDataReactiveState tmoving
createGenericStateIterateReactiveState genericStateIterate
createGenericTStateIterateReactiveState genericTStateIterate
createAccumulateReactiveState accumulate