Stateful Operator
Stateful operator refers to the operator that maintains state while processing streaming data. The output of stateful operator depends not only on the current input but also on previous states or historical records. After processing the current input, stateful operator updates its state and use it for later computation. As a result, the same input may produce different outputs.

Stateful Operator
Users can utilize DolphinDB’s built-in stateful operators in streaming engines, such as reactive state engine, time-series engines and window join engine. These built-in stateful operators have been systematically optimized for incremental computation, enhancing computational performance. In reactive state engine, users can also use user-defined stateful operators for complex stream computation.
Built-in Stateful Operator
Built-in functions used as stateful operators in reactive state engine:
-
cumulative function: cumavg, cumsum, cumprod, cumcount, cummin, cummax, cumvar, cumvarp, cumstd, cumstdp, cumcorr, cumcovar, cumbeta, cumwsum, cumwavg, cumfirstNot, cumlastNot, cummed, cumpercentile, cumnunique, cumPositiveStreak, cummdd
-
moving function: ema, mavg, msum, mcount, mprod, mvar, mvarp, mstd, mstdp, mskew, mkurtosis, mmin, mmax, mimin, mimax, mmed, mpercentile, mrank, mcorr, mcovar, mbeta, mwsum, mwavg, mmad, mfirst, mlast, mslr, tmove, tmfirst, tmlast, tmsum, tmavg, tmcount, tmvar, tmvarp, tmstd, tmstdp, tmprod, tmskew, tmkurtosis, tmmin, tmmax, tmmed, tmpercentile, tmrank, tmcovar, tmbeta, tmcorr, tmwavg, tmwsum, tmoving, moving, sma, wma, dema, tema, trima, linearTimeTrend, talib, t3, ma, mmaxPositiveStreak
-
row-based function: rowMin, rowMax, rowAnd, rowOr, rowXor, rowProd, rowSum, rowSum2, rowSize, rowCount, rowAvg, rowKurtosis, rowSkew, rowVar, rowVarp, rowStd, rowStdp
-
order-sensitive function: deltas, ratios, ffill, move, prev, iterate, ewmMean, ewmVar, ewmStd, ewmCov, ewmCorr, prevState, percentChange
-
topN function: msumTopN, mavgTopN, mstdpTopN, mstdTopN, mvarpTopN, mvarTopN, mcorrTopN, mbetaTopN, mcovarTopN, mwsumTopN, cumwsumTopN, cumsumTopN, cumvarTopN, cumvarpTopN, cumstdTopN, cumstdpTopN, cumcorrTopN, cumbetaTopN, cumavgTopN, cumskewTopN, cumkurtosisTopN, mskewTopN, mkurtosisTopN, tmsumTopN, tmavgTopN, tmstdTopN, tmstdpTopN, tmvarTopN, tmvarpTopN, tmskewTopN, tmkurtosisTopN, tmbetaTopN, tmcorrTopN, tmcovarTopN, tmwsumTopN
-
higher-order function: segmentby (whose parameter func can only take cumsum, cummax, cummin, cumcount, cumavg, cumstd, cumvar, cumstdp, cumvarp), moving, byColumn, accumulate, window
-
functions that can only be used in the reactive state engine: stateIterate, conditionalIterate, genericStateIterate, genericTStateIterate
Built-in functions used as stateful operators in time-series engines:
corr, covar, first, last, max, med, min, percentile, quantile, std, var, sum, sum2, sum3, sum4, wavg, wsum, count, firstNot, ifirstNot, lastNot, ilastNot, imax, imin, nunique, prod, sem, mode, searchK, beta, avg.
Built-in functions used as stateful operators in window join engine:
sum, sum2, avg, std, var, corr, covar, wavg, wsum, beta, max, min, last, first, med, percentile.
User-Defined Stateful Operator
In reactive state engine, users can implement a stateful operator by defining a
user-defined function and declaring it with keyword @state
before
the definition. Following points need to be noted:
@state
to declare the function
before the definition. For state functions, the following statements are
supported:-
Assignment and return statements
-
if...else
statements with scalar expressions (since 1.30.21/2.00.9) -
for
loops, includingbreak
andcontinue
(since 1.30.23/2.00.11). Loop iterations must be under 100 times. Nestedfor
loops are currently unsupported.
(2) Stateless or state functions can be used in a reactive state engine, but the metrics parameter cannot be specified as the stateless function nesting with the state function.
(3) If the rvalue of an assignment statement is a built-in or user-defined function that returns multiple values, the values must be assigned to variables at the same time. In the following example, the user-defined state function references linearTimeTrend, which returns two values.
@state
def forcast2(S, N){
linearregIntercept, linearregSlope = linearTimeTrend(S, N)
return (N - 1) * linearregSlope + linearregIntercept
}
Applications
In the following case, we use price change ratio as an example to show how to perform stateful computation using the reactive state engine. The price rate of change (ROC) is a momentum-based indicator used in the stock market, measuring the percentage change in price between the current price and the price a certain number of periods ago. We define ROC in this case as the ratio of the latest price in the current market snapshot to the latest price in the previous market snapshot.
(1) Create a shared stream table for publishing data.
share(table=streamTable(1:0, `securityID`datetime`lastPrice`openPrice, [SYMBOL,TIMESTAMP,DOUBLE,DOUBLE]), sharedName=`tick)
(2) Create a shared stream table for storing processed data.
share(table=streamTable(10000:0, `securityID`datetime`factor, [SYMBOL, TIMESTAMP, DOUBLE]), sharedName=`resultTable)
(3) Define the stateful operator priceChange.
@state
def priceChange(lastPrice){
return lastPrice \ prev(lastPrice) - 1
}
(4) Create a reactive state engine reactiveDemo.
try{ dropStreamEngine("reactiveDemo") } catch(ex){ print(ex) }
createReactiveStateEngine(name="reactiveDemo",
metrics =<[datetime, priceChange(lastPrice)]>,
dummyTable=tick, outputTable=resultTable, keyColumn="securityID")
When creating the reactive state engine, specify securityID as the grouping column to separately calculate the ROC for each stock. The input message format is the same as the stream table tick, and the results will be output to the in-memory table resultTable. The metrics to be calculated are defined in the parameter metrics.
(5) Submit subscription.
subscribeTable(tableName="tick", actionName="reactiveDemo",
handler=getStreamEngine(`reactiveDemo), msgAsTable=true, offset=-1)
(6) Write mock data to table tick.
securityID = ["AA", "AA", "BB", "AA"]
dateTime = [2024.06.02 09:00:00.000, 2024.06.02 09:01:00.000, 2024.06.02 09:03:00.000, 2024.06.02 09:04:00.000]
lastPrice = [9.99, 1.58, 5.37, 9.82]
openPrice = [10.05, 1.50, 5.25, 9.70]
simulateData = table(securityID, dateTime, lastPrice, openPrice)
tableInsert(tick, simulateData)
(7) Check data in table resultTable.
res = select * from resultTable

(8) Unsubscribe from the stream table tick. Note that all subscriptions to a stream table must be canceled before deleting the table.
unsubscribeTable(tableName="tick", actionName="reactiveDemo")
(9) Release the definition of the reactive state engine reactiveDemo from the memory.
dropStreamEngine(name="reactiveDemo")
(10) Delete the stream tables.
dropStreamTable(tableName="tick")
dropStreamTable(tableName="resultTable")