Stateful Operator

Stateful operator refers to the operator that maintains state while processing streaming data. The output of stateful operator depends not only on the current input but also on previous states or historical records. After processing the current input, stateful operator updates its state and use it for later computation. As a result, the same input may produce different outputs.

Stateful Operator

Users can utilize DolphinDB’s built-in stateful operators in streaming engines, such as reactive state engine, time-series engines and window join engine. These built-in stateful operators have been systematically optimized for incremental computation, enhancing computational performance. In reactive state engine, users can also use user-defined stateful operators for complex stream computation.

Built-in Stateful Operator

Built-in functions used as stateful operators in reactive state engine:

Built-in functions used as stateful operators in time-series engines:

corr, covar, first, last, max, med, min, percentile, quantile, std, var, sum, sum2, sum3, sum4, wavg, wsum, count, firstNot, ifirstNot, lastNot, ilastNot, imax, imin, nunique, prod, sem, mode, searchK, beta, avg.

Built-in functions used as stateful operators in window join engine:

sum, sum2, avg, std, var, corr, covar, wavg, wsum, beta, max, min, last, first, med, percentile.

User-Defined Stateful Operator

In reactive state engine, users can implement a stateful operator by defining a user-defined function and declaring it with keyword @state before the definition. Following points need to be noted:

(1) Add @state to declare the function before the definition. For state functions, the following statements are supported:
  • Assignment and return statements

  • if...else statements with scalar expressions (since 1.30.21/2.00.9)

  • for loops, including break and continue (since 1.30.23/2.00.11). Loop iterations must be under 100 times. Nested for loops are currently unsupported.

(2) Stateless or state functions can be used in a reactive state engine, but the metrics parameter cannot be specified as the stateless function nesting with the state function.

(3) If the rvalue of an assignment statement is a built-in or user-defined function that returns multiple values, the values must be assigned to variables at the same time. In the following example, the user-defined state function references linearTimeTrend, which returns two values.

@state
def forcast2(S, N){
	linearregIntercept, linearregSlope = linearTimeTrend(S, N)
	return (N - 1) * linearregSlope + linearregIntercept
}

Applications

In the following case, we use price change ratio as an example to show how to perform stateful computation using the reactive state engine. The price rate of change (ROC) is a momentum-based indicator used in the stock market, measuring the percentage change in price between the current price and the price a certain number of periods ago. We define ROC in this case as the ratio of the latest price in the current market snapshot to the latest price in the previous market snapshot.

(1) Create a shared stream table for publishing data.

share(table=streamTable(1:0, `securityID`datetime`lastPrice`openPrice, [SYMBOL,TIMESTAMP,DOUBLE,DOUBLE]), sharedName=`tick)

(2) Create a shared stream table for storing processed data.

share(table=streamTable(10000:0, `securityID`datetime`factor, [SYMBOL, TIMESTAMP, DOUBLE]), sharedName=`resultTable)

(3) Define the stateful operator priceChange.

@state
def priceChange(lastPrice){
    return lastPrice \ prev(lastPrice) - 1
}

(4) Create a reactive state engine reactiveDemo.

try{ dropStreamEngine("reactiveDemo") } catch(ex){ print(ex) }
createReactiveStateEngine(name="reactiveDemo", 
metrics =<[datetime, priceChange(lastPrice)]>, 
dummyTable=tick, outputTable=resultTable, keyColumn="securityID")

When creating the reactive state engine, specify securityID as the grouping column to separately calculate the ROC for each stock. The input message format is the same as the stream table tick, and the results will be output to the in-memory table resultTable. The metrics to be calculated are defined in the parameter metrics.

(5) Submit subscription.

subscribeTable(tableName="tick", actionName="reactiveDemo", 
handler=getStreamEngine(`reactiveDemo), msgAsTable=true, offset=-1)

(6) Write mock data to table tick.

securityID = ["AA", "AA", "BB", "AA"]
dateTime = [2024.06.02 09:00:00.000, 2024.06.02 09:01:00.000, 2024.06.02 09:03:00.000, 2024.06.02 09:04:00.000]
lastPrice = [9.99, 1.58, 5.37, 9.82]
openPrice = [10.05, 1.50, 5.25, 9.70]
simulateData =  table(securityID, dateTime, lastPrice, openPrice)
tableInsert(tick, simulateData)

(7) Check data in table resultTable.

res = select * from resultTable

(8) Unsubscribe from the stream table tick. Note that all subscriptions to a stream table must be canceled before deleting the table.

unsubscribeTable(tableName="tick", actionName="reactiveDemo")

(9) Release the definition of the reactive state engine reactiveDemo from the memory.

dropStreamEngine(name="reactiveDemo")

(10) Delete the stream tables.

dropStreamTable(tableName="tick")
dropStreamTable(tableName="resultTable")