stateIterate
Syntax
stateIterate(X, initial, initialWindow, iterateFunc,
combineCoeff)
This function can only be used as a state function in the reactive state engine.
Arguments
X is a vector. It can be a column from the engine's input table, or the result of a vectorized function with the column as its input argument.
initial is a vector used to fill the first initialWindow values in the corresponding result column of the engine's output table. It can be a column from the engine's input table, or the result of a vectorized function with the column as its input argument.
initialWindow is a positive integer determining the initial window size [0, initialWindow).
-
Moving functions: tmove, tmavg, tmmax, tmmin, tmsum, mavg, mmax, mmin, mcount, msum
-
Cumulative window functions: cumlastNot, cumfirstNot
-
Order-sensitive functions: ffill, move
-
As the iterations are performed based on the historical data, the output for the current record is calculated based on the historical results in the output table and X.
-
When calculating with time-based moving windows, windows are determined by the current timestamp T, and the interval is (T - window, T).
combineCoeff is a vector of length 2. The elements indicate the correlation coefficients between the result of interateFunc and X.
Details
Supposing the iteration is based only on the previous result, for the k-th (k ∈ N+) record, the calculation logic is (where the column "factor" holds the results):
-
k < initialWindow: factor[k] = initial[k]
-
k >= initialWindow:factor[k] = combineCoeff[0] * X[k] + combineCoeff[1] * iterateFunc(factor)[k-1]
If iterateFunc is a window function, the iteration is based on multiple previous results.
Examples
trade = table(take("A", 6) join take("B", 6) as sym, 1..12 as val0, take(10, 12) as val1)
trade;
sym | val0 | val1 |
---|---|---|
A | 1 | 10 |
A | 2 | 10 |
A | 3 | 10 |
A | 4 | 10 |
A | 5 | 10 |
A | 6 | 10 |
B | 7 | 10 |
B | 8 | 10 |
B | 9 | 10 |
B | 10 | 10 |
B | 11 | 10 |
B | 12 | 10 |
The following example defines a reactive state streaming engine and conducts the calculation within each group of sym.
-
For the 1st initialWindow records, the formula is factor[k]=initial[k]. So the factor[0], factor[1], factor[2] records take the value of val1.
-
For the subsequent records, the formula is factor[k]=0.5*val0[k]+0.5*msum(factor, 3)[k-1].
For the 4th record (sym=`A, val0=4) in the output table, there are 3 historical values in the factor column ([10,10,10]). Therefore, for the current record, the output is 0.5*4+0.5*msum([10, 10, 10], 3)[2]=17. Similarly, for the record (sym=`A, val0=5), the output is 0.5*5+0.5*msum([10, 10, 10, 17], 3)[3]=21.
inputTable = streamTable(1:0, `sym`val0`val1, [SYMBOL, INT, INT])
outputTable = table(100:0, `sym`factor, [STRING, DOUBLE])
engine = createReactiveStateEngine(name="rsTest", metrics=<[stateIterate(val0, val1, 3, msum{, 3}, [0.5, 0.5])]>, dummyTable=inputTable, outputTable=outputTable, keyColumn=["sym"], keepOrder=true)
engine.append!(trade)
select * from outputTable
sym | factor |
---|---|
A | 10 |
A | 10 |
A | 10 |
A | 17 |
A | 21 |
A | 27 |
B | 10 |
B | 10 |
B | 10 |
B | 20 |
B | 25.5 |
B | 33.75 |