genericStateIterate

Syntax

genericStateIterate(X, initial, window, func)

Arguments

X can be column(s) from the input table, or the calculation results by applying a vector function to the column(s). You can set X to [] to leave it unspecified; or use a tuple to specify multiple columns for X.

initial can be a column from the input table, or the calculation results by applying a vector function to it. It is used to fill the first to window-th records in the output table.

window is a non-negative integer that specifies the window size (measured by the number of records).

func is a stateless user-defined function with one scalar as the return value. Arguments passed to func are as follows:
  • The first argument is a vector containing the previous window results if window>0, or the previous result if window=0.

  • Then followed by columns specified in X.

  • [Optional] Other fixed constants to be passed to func. In this case, you can fix the arguments with partial application.

Details

This function performs calculation with count-based sliding windows iteratively.

Suppose X is specified as [X1, X2, ..., Xn], column "factor" in the output table holds the calculation results, column "initial" is the initial column, window is set to "w", and the iterate function is "func". For the k-th record, the calculation rule is:
  • When w-0:

    • k=1: factor[0] = func(initial[0], X1[0], X2[0], … , Xn[0])

    • k>1: factor[k-1] = func(factor[(k-2)], X1[k-1], X2[k-1], … , Xn[k-1])

  • When w>0:

    • k <= w: factor[k-1] = initial[k-1]

    • k > w: factor[k-1] = func(factor[(k-1-w):k-1], X1[k-1], X2[k-1], … , Xn[k-1])

Note that when a pair is used to indicate index, the right boundary is not inclusive, i.e., the range of (k-1-w):k-1 is [k-1-w, k-1).

Examples

// define a function
def myfunc(x, w){
    re = sum(x*w)
    return re
}

dateTime = 2021.09.09T09:30:00.000 2021.09.09T09:31:00.000 2021.09.09T09:32:00.000 2021.09.09T09:33:00.000 2021.09.09T09:34:00.000
securityID = `600021`600021`600021`600021`600021
volume = 310 280 300 290 240
price = 1.5 1.6 1.7 1.6 1.5
t = table(1:0, `dateTime`securityID`volume`price, [TIMESTAMP, SYMBOL, INT, DOUBLE])
tableInsert(t, dateTime, securityID, volume, price)
output = table(100:0, `securityID`dateTime`factor1, [SYMBOL, TIMESTAMP, DOUBLE])

engine = createReactiveStateEngine(name="test", metrics=[<dateTime>, <genericStateIterate(volume,price,3,myfunc{,})>], dummyTable=t, outputTable=output, keyColumn=`SecurityID, keepOrder=true)
engine.append!(t)
dropStreamEngine(`test)
securityID dateTime factor1
600021 2021.09.09T09:30:00.000 1.5
600021 2021.09.09T09:31:00.000 1.6
600021 2021.09.09T09:32:00.000 1.7
600021 2021.09.09T09:33:00.000 1,392
600021 2021.09.09T09:34:00.000 334,872

The above example is calculated as follows:

  • As window is set to 3, the first 3 records of factor1 in the initial window take the corresponding values of column price.

  • When the 4th record arrives, the initial window [1.5, 1.6, 1.7] and the current value of volume 290 are used for iteration. The result of myfunc([1.5, 1.6, 1.7], 290) is 1392.

  • When the 5th record arrives, the previous [1.6, 1.7, 1392] and the current value of volume 240 are used for iteration. The result of myfunc([1.6, 1.7, 1392], 240) is 334872.

The calculation process is preceded in the same way if data ingestion continues.

Related function: genericTStateIterate