DStream::reactiveStateEngine
Syntax
DStream::reactiveStateEngine(metrics, [keyColumn], [filter], [keepOrder],
[keyPurgeFilter], [keyPurgeFreqInSecond=0], [keyCapacity=1024],
[parallelism=1])
Details
Creates a reactive state engine. For details, see createReactiveStateEngine.
Return value: A DStream object.
Arguments
metrics is metacode specifying the formulas for calculation. The metacode can include one or more expressions, built-in or user-defined functions, or a constant scalar/vector. Note that the output column for a constant vector must be in array vector form. For more information about metacode refer to Metaprogramming. To use a user-defined function in the reactive state engine,
@state
to declare the function
before the definition. For state functions, the following statements are
supported:-
Assignment and return statements
-
if...else
statements with scalar expressions (since 1.30.21/2.00.9) -
for
loops, includingbreak
andcontinue
(since 1.30.23/2.00.11). Loop iterations must be under 100 times. Nestedfor
loops are currently unsupported.
(2) Stateless or state functions can be used in a reactive state engine, but the metrics parameter cannot be specified as the stateless function nesting with the state function.
(3) If the rvalue of an assignment statement is a built-in or user-defined function that returns multiple values, the values must be assigned to variables at the same time. In the following example, the user-defined state function references linearTimeTrend, which returns two values.
@state
def forcast2(S, N){
linearregIntercept, linearregSlope = linearTimeTrend(S, N)
return (N - 1) * linearregSlope + linearregIntercept
}
Note: The column names specified in metrics are not case-sensitive and can be inconsistent with the column names of the input tables.
keyColumn (optional) is a STRING scalar/vector indicating the grouping column(s). The calculation is conducted within each group.
filter (optional) is the metacode that indicates the filtering conditions. A filtering condition must be an expression and only columns of dummyTable can be included. You can specify multiple conditions with logical operators (and, or). Only the results that satisfy the filter conditions are ingested to the output table.
keepOrder (optional) specifies whether to preserve the insertion order of the records in the output table. If keyColumn contains a time column, the default value is true, and otherwise false.
To clean up the data that is no longer needed after calculation, specify parameters keyPurgeFilter and keyPurgeFreqInSecond.
keyPurgeFilter (optional) indicates the filtering conditions that identify the data to be purged from the cache. It is metacode composed of conditional expressions, and these expressions must refer to the columns in the outputTable. keyPurgeFilter is effective only when keyColumn is specified.
keyPurgeFreqInSecond (optional) is a positive integer indicating the time interval (in seconds) to trigger a purge. keyPurgeFreqInSecond is effective only when keyColumn is specified.
For each data ingestion, the engine starts a purge if all of the following conditions are satisfied:
(1) The time elapsed since the last data ingestion is equal to or greater than keyPurgeFreqInSecond (For the first check, the time elapsed between the ingestion of data and the creation of the engine is used);
(2) If the first condition is satisfied, the engine applies keyPurgeFilter to the cached data to get the data to be purged.
(3) The number of groups which contain data to be purged is equal to or greater than 10% of the total number of groups in the engine.
To check the engine status before and after the purge, call
getStreamEngineStat().ReactiveStreamEngine
(see getStreamEngineStat) where the
numGroups field indicates the number of groups in the reactive state
streaming engine.
keyCapacity (optional) is a positive integer indicating the amount of memory allocated for buffering state of each group (defined by keyColumn) on a row basis. The default value is 1024. For data with large amount of groups, setting of this parameter can reduce the latency that may occur.
parallelism (optional) is a positive integer no greater than 63, indicating the maximum number of workers that can run in parallel. The default value is 1. For large computation workloads, reasonable adjustment of this parameter can effectively utilize computing resources and reduce computation time.
Note: parallelism cannot exceed the lesser of the numbers of licensed cores and logical cores minus one.
Examples
if (!existsCatalog("orca")) {
createCatalog("orca")
}
go
use catalog orca
// If a stream graph with the same name already exists, destroy it first.
// dropStreamGraph('engine')
g = createStreamGraph('engine')
g.source("trades", 1000:0, `date`time`sym`market`price`qty, [DATE, TIME, SYMBOL, CHAR, DOUBLE, INT])
.reactiveStateEngine(metrics=<mavg(price, 3)>, keyColumn=["date","sym"], filter=<date between 2012.01.01 : 2012.01.03>, keepOrder=true)
.sink("output")
g.submit()
go
n=100
tmp = table(rand(2012.01.01..2012.01.10, n) as date, rand(09:00:00.000..15:59:59.999, n) as time, rand("A"+string(1..10), n) as sym, rand(['B', 'S'], n) as market, rand(100.0, n) as price, rand(1000..2000, n) as qty)
appendOrcaStreamTable("trades", tmp)