genericTStateIterate

Syntax

genericTStateIterate(T, X, initial, window, func, [leftClosed=false])

Arguments

T is a non-strictly increasing vector of temporal or integral type. It cannot contain NULL values. Note that out-of-order data is discarded in the calculation.

X can be column(s) from the input table, or the calculation results by applying a vector function to the column(s). You can set X to [] to leave it unspecified; or use a tuple to specify multiple columns for X.

initial is the column used to fill the initial window in the result column of the output table. It can be a column from the input table, or the calculation results by applying a vector function to it. Suppose the timestamp of the first record is t0, the initial window is [t0, t0 + window), measured by time interval.

window is a positive integer or a DURATION scalar that specifies the window size. When window is an integer, it has the same time unit as T.

func is a stateless user-defined function with one scalar as the return value. Arguments passed to func are as follows:
  • The first argument is a vector containing the previous window results.

  • Then followed by columns specified in X.

  • [Optional] Other fixed constants to be passed to func. In this case, you can fix the arguments with partial application.

leftClosed (optional) is a Boolean value indicating whether the left boundary of the window is inclusive. The default value is false.

Details

This function performs calculation with time-based windows iteratively.

Suppose T is a time column, X is [X1, X2, ..., Xn], column "factor" in the output table holds the calculation results, column "initial" is the initial column, window is set to "w", and the iterate function is "func".

For the k-th record (with its timestamp Tk), the calculation rule is:
  • Tk ∈ [T1, T1+w): factor[k] = initial[k]

  • factor[k] = func(subFactor, X1[k], X2[k], … , Xn[k]), where

    • subFactor is the value of factor in the current window

    • the window for the (k+1)th record is (Tk-w, Tk] (when leftClosed=false) or [Tk-w, Tk] (when leftClosed=true).

Examples

Example 1. When leftClosed is set to false:

// define a function
def myfunc(x, w){
re = sum(x*w)
return re
}

dateTime = 2021.09.09T09:28:00.000 2021.09.09T09:28:30.000 2021.09.09T09:30:00.000 2021.09.09T09:31:00.000 2021.09.09T09:32:00.000
securityID = `600021`600021`600021`600021`600021
volume = 310 280 300 290 240
price = 1.5 1.6 1.7 1.6 1.5
t = table(1:0, `dateTime`securityID`volume`price, [TIMESTAMP, SYMBOL, INT, DOUBLE])
tableInsert(t, dateTime, securityID, volume, price)
output = table(100:0, `securityID`dateTime`factor1, [SYMBOL, TIMESTAMP, DOUBLE])

engine = createReactiveStateEngine(name="test", metrics=[<dateTime>, <genericTStateIterate(dateTime,volume,price,2m,myfunc{,})>], dummyTable=t, outputTable=output, keyColumn=`SecurityID, keepOrder=true)
engine.append!(t)
dropStreamEngine(`test)
securityID dateTime factor1
600021 2021.09.09T09:28:00.000 1.5
600021 2021.09.09T09:28:30.000 1.6
600021 2021.09.09T09:30:00.000 930
600021 2021.09.09T09:31:00.000 270,164
600021 2021.09.09T09:32:00.000 65,062,560

The above example is calculated as follows:

  • As the timestamp of the first record is 09:28:00.000 and window size is 2 min, the initial window is [2021.09.09T09:28:00.000, 2021.09.09T09:30:00.000). For the first 2 records are included in the window, corresponding values of price are output to factor1 directly.

  • The 3rd record belongs to window (2021.09.09T09:26:30.000, 2021.09.09T09:28:30.000]. Data in the previous window [1.5, 1.6] and volume 300 are used for iteration. The result of myfunc([1.5, 1.6], 300) is 930.

  • The 4th record belongs to window (2021.09.09T09:28:00.000, 2021.09.09T09:30:00.000]. Data in the previous window [1.6, 930] and volume 290 are used for iteration. The result of myfunc([1.6, 930], 290) is 270164.

The calculation process is preceded in the same way if data ingestion continues.

Example 2. When leftClosed is set to true:

$ engine = createReactiveStateEngine(name="test", metrics=[<dateTime>, <genericTStateIterate(dateTime,volume,price,2m,myfunc{,},true)>], dummyTable=t, outputTable=output, keyColumn=`SecurityID, keepOrder=true)
securityID dateTime factor1
600021 2021.09.09T09:28:00.000 1.5
600021 2021.09.09T09:28:30.000 1.6
600021 2021.09.09T09:30:00.000 930
600021 2021.09.09T09:31:00.000 270,599
600021 2021.09.09T09:32:00.000 65,166,960

The above example is calculated as follows:

  • As the timestamp of the first record is 09:28:00.000 and window size is 2 min, the initial window is [2021.09.09T09:28:00.000, 2021.09.09T09:30:00.000). For the first 2 records are included in the window, corresponding values of price are output to factor1 directly.

  • The 3rd record belongs to window [2021.09.09T09:26:30.000, 2021.09.09T09:28:30.000]. Data in the previous window is [1.5, 1.6] and volume is 300. The result of myfunc([1.5, 1.6], 300) is 930.

  • The 4th record belongs to window [2021.09.09T09:28:00.000, 2021.09.09T09:30:00.000]. Data in the previous window [1.5, 1.6, 930] and volume 290 are used for iteration. The result of myfunc([1.5, 1.6, 930], 290) is 270599.

The calculation process is preceded in the same way if data ingestion continues.

Related function: genericStateIterate