DStream::sparseReactiveStateEngine

Syntax

DStream::sparseReactiveStateEngine(metrics, keyColumn, [extraColumn])

Parameters

metrics is a table representing the set of sparse state computation rules. It must contain at least 3 columns: keyColumn(s), formula, outputMetricKey.
  • The first N columns are input metric identifier columns. Their count and order must match those specified by keyColumn. Each row provides some values of keyColumn in the input table (e.g., if the input table identifies metrics by deviceID, values can be "A001", "R131", etc.).
  • formula is a STRING scalar/vector or metacode representing the computation expression for the metric. Variable names in the expression correspond to value columns in the input table (e.g., metricValue; also multi-value columns such as Value1, Value2).
  • outputMetricKey is a STRING scalar/vector indicating the new output metric name (must be unique), e.g., "A001_event_B".

Notes: In Orca, the formula column with user-defined functions supports only metacode in <> format.

keyColumn is a STRING scalar/vector indicating the primary key column names of the input table (used to identify “which metric/device the current row belongs to”). If keyColumn has N columns, metrics must have N corresponding leading identifier columns.

extraColumn (optional) is a STRING scalar/vector indicating the column names in the input table that should be carried to the output table unchanged (e.g., time columns).

Returns

A DStream object.

Examples

// Create a catalog if it does not exist
if (!existsCatalog("orca")) {  
    createCatalog("orca")  
}  
go  
use catalog orca  
  
// If a stream graph with the same name already exists, destroy it first
// dropStreamGraph("sparseGraph")
g = createStreamGraph("sparseGraph")

// Define the schema of inputTable and outputTable, as well as the SparseReactiveStateEngine

baseStream = g.source("trade", `timestamp`date`deviceId1`deviceId2`deviceId3`value1`value2`value3, [TIMESTAMP, DATE, STRING, STRING, STRING, DOUBLE, DOUBLE, DOUBLE])
formulas = [<cumsumTopN(value1, value2, 5)>, <cumavgTopN(value1, value2, 10)>, <cumstdTopN(value1, value2, 15)>, <cumstdpTopN(value1, value2, 20)>, <cumvarTopN(value1, value2, 5)>, <cumvarpTopN(value1, value2, 10)>, <cumskewTopN(value1, value2, 10)>, <cumkurtosisTopN(value1, value2, 10)>, <cumbetaTopN(value1, value2, value3, 10)>, <cumcorrTopN(value1, value2, value3, 10)>, <cumcovarTopN(value1, value2, value3, 10)>, <cumwsumTopN(value1, value2, value3, 10)>]
keys = "A"+string(1..size(formulas))
keys1 = keys.shuffle()
keys2 = keys.shuffle()
outKeys = "event"+string(1..size(formulas))
metrics = table(
    keys1 as deviceId1,
    keys2 as deviceId2,
    formulas as formula,
    outKeys as outputMetricKey
)
baseStream.sparseReactiveStateEngine(metrics, `deviceId1`deviceId2, `timestamp`date)
.setEngineName("srsEngine")
.sink("output")
g.submit()
go


// Append data and view the ouput
n = 10000
for(i in 1..5){
    data = table(rand(timestamp(1..1000), n) as timestamp, rand(date(1..1000), n) as date, rand(keys, n) as deviceId1, rand(keys, n) as deviceId2, take(keys, n) as deviceId3, rand(rand(-1000.0:1000.0, n) join take(double(), n/5), n) as value1, rand(rand(-1000.0:1000.0, n) join take(double(), n/5), n) as value2, rand(rand(-1000.0:1000.0, n) join take(double(), n/5), n) as value3)
    appendOrcaStreamTable("trade", data)
}
sleep(3000)
res = select * from orca_table.output

Related function: createSparseReactiveStateEngine