状态函数计算库
在头文件 swordfish.h 中,声明了名为 ReactiveStateFactory
的结构体,包含了多个静态函数,用于实现各种可用于响应式状态引擎的状态算子。状态算子用于有状态计算,意味着在计算过程中,其输出不仅取决于当前输入,还受到先前状态或历史记录的影响。状态算子在处理当前输入后,会更新其内部状态,并将该状态用于后续计算。
状态函数特性及其应用
状态函数计算库是 Swordfish 的重要组成部分,具有以下特性:
-
增量计算:状态函数能够根据当前输入和之前的状态进行增量计算,避免了重复计算,提高了计算性能。
-
分组计算:支持对数据进行分组计算,对分组应用计算规则。
-
多种算子:支持多种类型的算子,包括:TA-Lib 函数、累计窗口函数、行计算函数、序列函数、topN 函数、高阶函数。
基于以上特性,响应式状态函数适用于对单条数据即时响应并实时更新输出计算结果的高频计算场景,例如:
-
金融:实时计算高频因子,如基于快照数据计算股票的短时涨幅等。
-
物联网:检测传感器状态的变化,如实时监控温度、湿度、压力等数据指标的波动。
状态函数使用
函数说明
因为 ReactiveStateFactory 中状态函数的参数形式一致,这里仅以
createMsumReactiveState 为例进行说明,其函数声明如下:
createMsumReactiveState(const vector<ObjectSP>& args,
const vector<int>& inputColIndices, const vector<DATA_TYPE>& inputColTypes,
const vector<int>& outputColIndices)
该函数实现了 msum 的功能,用于处理 TA-Lib 计算中空值。其参数列表如下:
args:传入的参数列表。这里需要按照 msum 函数的要求传入参数。
inputColIndices:计算输入列的索引。
inputColTypes:计算输入列的类型。
outputColIndices:输出列的索引。
用户在代码中须通过 #include 指令获取头文件
Swordfish.h,且使用前应通过
DolphinDBLib::initializeRuntime() 初始化运行时,使用后通过
DolphinDBLib::finalizeRuntime() 关闭运行时。
下文将分别举例介绍几种常用引擎的用法。为保持代码简洁,以下的预处理器编译指令和运行时相关代码将不再重复展示,仅展示具体函数的实现,即以下代码
XXX 部分。
#include "Swordfish.h"
int main()
{
DolphinDBLib::initializeRuntime();
XXX
DolphinDBLib::finalizeRuntime();
return 0;
}
示例
本示例以创建一个状态引擎的 msum 算子为例,介绍状态函数的使用方法。
在代码中必须通过 #include 指令获取头文件 Swordfish.h。在使用状态函数前,必须通过
DolphinDBLib::initializeRuntime() 初始化运行时,在使用状态函数后通过
DolphinDBLib::finalizeRuntime() 关闭运行时。
#include "Swordfish.h"
int main()
{
DolphinDBLib::initializeRuntime();
// 代码实现
DolphinDBLib::finalizeRuntime();
return 0;
}
下例创建一个 msumReactiveState 算子,用于增量计算给定长度的移动窗口内的元素总和。实现流程如下:
-
创建 DolphinDB 会话,以便在会话中执行 DolphinDB 的内置函数。
-
创建状态表,用于存储 val 和 msum 的值。
-
创建参数数组 args,第一个参数为 val 列的引用,第二个参数为窗口长度 3,用于计算移动求和。
-
使用
ReactiveStateFactory::createMsumReactiveState创建一个移动求和的状态算子msumReactiveState。 -
向状态表中插入初始值(DBL_NMIN),表示移动求和的初始状态。
-
创建一个空的结果向量 msumResult,用于存储计算结果。
-
定义输入数据 valData,后续会对这些数值进行移动求和。
-
更新状态表,并调用
msumReactiveState->append()方法进行增量计算。从状态表中获取当前索引的移动求和结果,并将其添加到结果向量msumResult中。 -
输出最终的移动求和结果。
#include "Swordfish.h"
int main() {
DolphinDBLib::initializeRuntime();
SessionSP session = DolphinDBLib::createSession();
// Calculate msum(val, 3) context by sym
vector<string> colNames = {"val", "msum"};
vector<DATA_TYPE> colTypes = { DT_DOUBLE, DT_DOUBLE};
TableSP stateTable = Util::createTable(colNames, colTypes, 0, 1);
vector<ObjectSP> args(2);
SQLContextSP context = new SQLContext();
args[0] = new ColumnRef(context, "val");
// Set window = 3
args[1] = new Int(3);
// Set input column index in stateTable
vector<int> inputColIndices = {0};
// Set input column data type in stateTable
vector<DATA_TYPE> inputColTypes = {DT_DOUBLE};
// Set output column index in stateTable
vector<int> outputColIndices = {1};
// Create msum reactive state
ReactiveStateSP msumReactiveState = ReactiveStateFactory::createMsumReactiveState(args, inputColIndices, inputColTypes, outputColIndices);
msumReactiveState->setTable(stateTable);
msumReactiveState->setSession(session);
context->setTable(stateTable);
INDEX rowInserted = 0;
string errMsg;
std::vector<ConstantSP> rowValues = {new Double(DBL_NMIN), new Double(DBL_NMIN)};
if(!stateTable->append(rowValues, rowInserted, errMsg)){
throw RuntimeException("Failed to append data to state table with error: " + errMsg);
}
msumReactiveState->addKeys(1);
// Create result VectorSP
VectorSP msumResult = Util::createVector(DT_DOUBLE, 0);
// Create input data
std::vector<double> valData = {1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0};
INDEX rows = valData.size();
ConstantSP val = new Double();
std::vector<ConstantSP> updateValue = {val};
std::vector<string> updateColName = {"val"};
VectorSP index = Util::createIndexVector(0,1);
// Iterate through each element of valCol
for(int i = 0; i < rows; i++){
// Get the val element
val->setDouble(valData[i]);
if(!stateTable->update(updateValue, index, updateColName, errMsg)){
throw RuntimeException("Failed to update state table with error: " + errMsg);
}
// Incrementally calculate msum
msumReactiveState->append(session->getHeap().get(), index);
// Get the result from stateTable for the current index
ConstantSP res = stateTable->getColumn(1)->get(index);
msumResult->append(res);
}
std::cout<<" msum result"<<std::endl;
std::cout << msumResult->getString() << std::endl;
DolphinDBLib::finalizeRuntime();
return 0;
}
API 参考
本节给出 Swordfish 中的状态函数与 DolphinDB 内置函数的对应表。有关函数功能的详细解释,请参考 DolphinDB。
| Swordfish 状态函数 | DolphinDB 函数 |
|---|---|
| createTalibNullReactiveState | talibNull |
| createTalibReactiveState | talib |
| createKamaReactiveState | kama |
| createSmaReactiveState | sma |
| createWmaReactiveState | wma |
| createMaReactiveState | ma |
| createDemaReactiveState | dema |
| createTemaReactiveState | tema |
| createTrimaReactiveState | trima |
| createT3ReactiveState | t3 |
| createLinearTimeTrendReactiveState | LinearTimeTrend |
| createEmaReactiveState | ema |
| createWilderReactiveState | wilder |
| createGemaReactiveState | gema |
| createMoveReactiveState | move |
| createPrevReactiveState | prev |
| createDeltasReactiveState | deltas |
| createRatiosReactiveState | ratios |
| createPercentChangeReactiveState | percentChange |
| createFfillReactiveState | ffill |
| createIterateReactiveState | iterate |
| createEwmMeanReactiveState | ewmMean |
| createEwmVarReactiveState | ewmVar |
| createEwmStdReactiveState | ewmStd |
| createEwmCovReactiveState | ewmCov |
| createEwmCorrReactiveState | ewmCorr |
| createMcountReactiveState | mcount |
| createMavgReactiveState | mavg |
| createMsumReactiveState | msum |
| createMprodReactiveState | mprod |
| createMvarReactiveState | mvar |
| createMvarpReactiveState | mvarp |
| createMstdReactiveState | mstd |
| createMstdpReactiveState | mstdp |
| createMskewReactiveState | mskew |
| createMkurtosisReactiveState | mkurtosis |
| createMminReactiveState | mmin |
| createMmaxReactiveState | mmax |
| createMiminReactiveState | mimin |
| createMimaxReactiveState | mimax |
| createMfirstReactiveState | mfirst |
| createMlastReactiveState | mlast |
| createMmedReactiveState | mmed |
| createMpercentileReactiveState | mpercentile |
| createMrankReactiveState | mrank |
| createMcorrReactiveState | mcorr |
| createMcovarReactiveState | mcovar |
| createMbetaReactiveState | mbeta |
| createMwsumReactiveState | mwsum |
| createMwavgReactiveState | mwavg |
| createMslrReactiveState | mslr |
| createCumavgReactiveState | cumavg |
| createCumsumReactiveState | cumsum |
| createCumprodReactiveState | cumprod |
| createCumcountReactiveState | cumcount |
| createCumvarReactiveState | cumvar |
| createCumminReactiveState | cummin |
| createCummaxReactiveState | cummax |
| createCumvarpReactiveState | cumvarp |
| createCumstdReactiveState | cumstd |
| createCumstdpReactiveState | cumstdp |
| createCumcorrReactiveState | cumcorr |
| createCumcovarReactiveState | cumcovar |
| createCumbetaReactiveState | cumbeta |
| createCumwsumReactiveState | cumwsum |
| createCumwavgReactiveState | cumwavg |
| createCumfirstNotReactiveState | cumfirstNot |
| createCumlastNotReactiveState | cumlastNot |
| createCummedReactiveState | cummed |
| createCumpercentileReactiveState | cumpercentile |
| createCumPositiveStreakReactiveState | cumPositiveStreak |
| createCumnuniqueReactiveState | cumnunique |
| createMovingReactiveState | moving |
| createWindowReactiveState | window |
| createTMoveReactiveState | tmove |
| createTMovingsumReactiveState | tmsum |
| createTMovingsum2ReactiveState | tmsum2 |
| createTMovingminReactiveState | tmmin |
| createTMovingmaxReactiveState | tmmax |
| createTMovingrankReactiveState | tmrank |
| createTMovingmedReactiveState | tmmed |
| createTMovingpercentileReactiveState | tmpercentile |
| createTMovingprodReactiveState | tmprod |
| createTMovingcorrReactiveState | tmcorr |
| createTMovingbetaReactiveState | tmbeta |
| createTMovingcovarReactiveState | tmcovar |
| createTMovingwsumReactiveState | tmwsum |
| createTMovingwavgReactiveState | tmwavg |
| createTMovingskewReactiveState | tmskew |
| createTMovingkurtosisReactiveState | tmkurtosis |
| createTMovingavgReactiveState | tmavg |
| createTMovingcountReactiveState | tmcount |
| createTMovingvarReactiveState | tmvar |
| createTMovingvarpReactiveState | tmvarp |
| createTMovingstdReactiveState | tmstd |
| createTMovingstdpReactiveState | tmstdp |
| createTMovingfirstReactiveState | tmfirst |
| createTMovinglastReactiveState | tmlast |
| createTMovingReactiveState | tmoving |
| createMmadReactiveState | mmad |
| createCumsumTopNReactiveState | cumsumTopN |
| createCumavgTopNReactiveState | cumavgTopN |
| createCumstdTopNReactiveState | cumstdTopN |
| createCumstdpTopNReactiveState | cumstdpTopN |
| createCumvarTopNReactiveState | cumvarTopN |
| createCumvarpTopNReactiveState | cumvarpTopN |
| createCumbetaTopNReactiveState | cumbetaTopN |
| createCumcorrTopNReactiveState | cumcorrTopN |
| createCumcovarTopNReactiveState | cumvarTopN |
| createCumwsumTopNReactiveState | cumwsumTopN |
| createCumskewTopNReactiveState | cumskewTopN |
| createCumkurtosisTopNReactiveState | cumkurtosisTopN |
| createMsumTopNReactiveState | cumsumTopN |
| createMavgTopNReactiveState | mavgTopN |
| createMstdpTopNReactiveState | mstdpTopN |
| createMstdTopNReactiveState | mstdTopN |
| createMvarpTopNReactiveState | mvarpTopN |
| createMvarTopNReactiveState | mvarTopN |
| createMcorrTopNReactiveState | mcorrTopN |
| createMbetaTopNReactiveState | mbetaTopN |
| createMcovarTopNReactiveState | mcovarTopN |
| createMwsumTopNReactiveState | mwsumTopN |
| createMskewTopNReactiveState | mskewTopN |
| createMkurtosisTopNReactiveState | mkurtosisTopN |
| createTMovingsumTopNReactiveState | tmsumTopN |
| createTMovingavgTopNReactiveState | tmavgTopN |
| createTMovingstdpTopNReactiveState | tmstdpTopN |
| createTMovingstdTopNReactiveState | tmstdTopN |
| createTMovingvarpTopNReactiveState | tmvarpTopN |
| createTMovingvarTopNReactiveState | tmvarTopN |
| createTMovingskewTopNReactiveState | tmskewTopN |
| createTMovingkurtosisTopNReactiveState | tmkurtosisTopN |
| createTMovingcorrTopNReactiveState | tmcorrTopN |
| createTMovingbetaTopNReactiveState | tmbetaTopN |
| createTMovingcovarTopNReactiveState | tmcovarTopN |
| createTMovingwsumTopNReactiveState | tmwsumTopN |
| createDynamicGroupCumcountReactiveState | dynamicGroupCumcount |
| createDynamicGroupCumsumReactiveState | dynamicGroupCumsum |
| createSegmentbyReactiveState | segmentby |
| createPrevStateReactiveState | prevState |
| createTopRangeReactiveState | topRange |
| createLowRangeReactiveState | lowRange |
| createMmaxPositiveStreakReactiveState | mmaxPositiveStreak |
| createSumbarsReactiveState | sumbars |
| createTrueRangeReactiveState | trueRange |
| createConditionalIterateReactiveState | conditionalIterate |
| createStateIterateReactiveState | stateIterate |
| createMovingWindowDataReactiveState | moving |
| createTMovingWindowDataReactiveState | tmoving |
| createGenericStateIterateReactiveState | genericStateIterate |
| createGenericTStateIterateReactiveState | genericTStateIterate |
| createAccumulateReactiveState | accumulate |
