stateIterate

Syntax

stateIterate(X, initial, initialWindow, iterateFunc, combineCoeff)

This function can only be used as a state function in the reactive state engine.

Arguments

X is a vector. It can be a column from the engine's input table, or the result of a vectorized function with the column as its input argument.

initial is a vector used to fill the first initialWindow values in the corresponding result column of the engine's output table. It can be a column from the engine's input table, or the result of a vectorized function with the column as its input argument.

initialWindow is a positive integer determining the initial window size [0, initialWindow).

iterateFunc is the function for iteration, whose only parameter is the column from the output table. Currently, only the following functions are supported (use partial application to specify functions with multiple parameters):
  • Moving functions: tmove, tmavg, tmmax, tmmin, tmsum, mavg, mmax, mmin, mcount, msum

  • Cumulative window functions: cumlastNot, cumfirstNot

  • Order-sensitive functions: ffill, move

Note:
  • As the iterations are performed based on the historical data, the output for the current record is calculated based on the historical results in the output table and X.

  • When calculating with time-based moving windows, windows are determined by the current timestamp T, and the interval is (T - window, T).

combineCoeff is a vector of length 2. The elements indicate the correlation coefficients between the result of interateFunc and X.

Details

Supposing the iteration is based only on the previous result, for the k-th (k ∈ N+) record, the calculation logic is (where the column "factor" holds the results):

  • k < initialWindow: factor[k] = initial[k]

  • k >= initialWindow:factor[k] = combineCoeff[0] * X[k] + combineCoeff[1] * iterateFunc(factor)[k-1]

If iterateFunc is a window function, the iteration is based on multiple previous results.

Examples

trade = table(take("A", 6) join take("B", 6) as sym,  1..12 as val0,  take(10, 12) as val1)
trade;
sym val0 val1
A 1 10
A 2 10
A 3 10
A 4 10
A 5 10
A 6 10
B 7 10
B 8 10
B 9 10
B 10 10
B 11 10
B 12 10

The following example defines a reactive state streaming engine and conducts the calculation within each group of sym.

  • For the 1st initialWindow records, the formula is factor[k]=initial[k]. So the factor[0], factor[1], factor[2] records take the value of val1.

  • For the subsequent records, the formula is factor[k]=0.5*val0[k]+0.5*msum(factor, 3)[k-1].

For the 4th record (sym=`A, val0=4) in the output table, there are 3 historical values in the factor column ([10,10,10]). Therefore, for the current record, the output is 0.5*4+0.5*msum([10, 10, 10], 3)[2]=17. Similarly, for the record (sym=`A, val0=5), the output is 0.5*5+0.5*msum([10, 10, 10, 17], 3)[3]=21.

inputTable = streamTable(1:0, `sym`val0`val1, [SYMBOL, INT, INT])
outputTable = table(100:0, `sym`factor, [STRING, DOUBLE])
engine = createReactiveStateEngine(name="rsTest", metrics=<[stateIterate(val0, val1, 3, msum{, 3}, [0.5, 0.5])]>, dummyTable=inputTable, outputTable=outputTable, keyColumn=["sym"], keepOrder=true)

engine.append!(trade)
select * from outputTable
sym factor
A 10
A 10
A 10
A 17
A 21
A 27
B 10
B 10
B 10
B 20
B 25.5
B 33.75