DStream::reactiveStatelessEngine
Syntax
DStream::reactiveStatelessEngine(metrics)
Details
Creates a reactive stateless engine. For details, see createReactiveStatelessEngine.
Return value: A DStream object.
Arguments
-
"outputName"->productName:metricName
-
"formula"-><expression>
The value of formula is a metacode expression defining the formula for calculation, which can reference other variables. For example, the expression can be
<A*B>
, and A and B are precedent variables for this formula. -
Key-value pairs specifying the precedent variable locations used in the formula above. For exmaple, for
<A*B>
, specify:- "A"->productName:metricName
- "B"->productName:metricName
The productName and metricName uniquely specify the location of the variable, which can be the input or output table.
Examples
if (!existsCatalog("orca")) {
createCatalog("orca")
}
go
use catalog orca
// If a stream graph with the same name already exists, destroy it first.
// dropStreamGraph('engine')
g = createStreamGraph('engine')
metrics = array(ANY, 0, 0)
metric1 = dict(STRING,ANY)
// product_B:value=product_A:factor1+product_A:factor2+product_B:factor1
metric1["outputName"] = `product_B:`value
metric1["formula"] = <A+B+C>
metric1["A"] = `product_A:`factor1
metric1["B"] = `product_A:`factor2
metric1["C"] = `product_B:`factor1
metrics.append!(metric1)
// product_C:value=product_B:value*product_C:factor1
metric2 = dict(STRING, ANY)
metric2["outputName"] =`product_C:`value
metric2["formula"] = <A*B>
metric2["A"] = `product_B:`value
metric2["B"] = `product_C:`factor1
metrics.append!(metric2)
g.source("input", 1000:0, `product`factor`value, [STRING, STRING, DOUBLE])
.reactiveStatelessEngine(metrics)
.sink("output")
g.submit()
go
products = take("product_A", 2)
factors = ["factor1", "factor2"]
values = [1.0, 2.0]
tmp = table(products as product, factors as factor, values as value)
appendOrcaStreamTable("input", tmp)
products = take("product_B", 1)
factors = take("factor1", 1)
values = take(1.0, 1)
tmp = table(products as product, factors as factor, values as value)
appendOrcaStreamTable("input", tmp)
select * from orca_table.output
productName | metricName | metricsResults |
---|---|---|
product_B | value | |
product_C | value | |
product_B | value | 4 |
product_C | value |