createReactiveStatelessEngine
Syntax
createReactiveStatelessEngine(name, metrics, outputTable)
Details
This function creates a reactive stateless engine and returns a table object with the following schema:
Column Name | Type |
---|---|
productName | STRING |
metricName | STRING |
value | DOUBLE |
Writing to the table means that data is ingested into the engine for calculation. The reactive stateless engine supports defining dependencies for dynamic calculations.
Calculation Rules
The reactive stateless engine processes each batch of input data and calculates formulas as defined in metrics. Once a precedent value (referred to by a metric) is ingested or updated, all dependent formulas are output accordingly. The calculation is based on the latest variable values.
Arguments
name is a string of the engine name. It is the only identifier of a reactive state engine on a data/compute node. It can have letter, number and "_" and must start with a letter.
metrics is a vector of dictionaries, specifying the calculation formulas and their dependencies. Each dictionary has the following key-value pairs:
-
"outputName"->productName:metricName
-
"formula"-><expression>
The value of formula is a metacode expression defining the formula for calculation, which can reference other variables. For example, the expression can be
<A*B>
, and A and B are precedent variables for this formula. -
Key-value pairs specifying the precedent variable locations used in the formula above. For exmaple, for
<A*B>
, specify:- "A"->productName:metricName
- "B"->productName:metricName
The productName and metricName uniquely specify the location of the variable, which can be the input or output table.
outputTable is the output table for the results. It can be an in-memory table or a DFS table. Create an empty table and specify the column names and types before calling the function.
The output columns are in the following order:
- productName: STRING or SYMBOL type, as defined in metrics.
- metricName: STRING or SYMBOL type, as defined in metrics.
- A column of DOUBLE or FLOAT type storing calculation results of formula defined in metrics.
Examples
productName | metricName | value |
---|---|---|
product_A | factor1 | 1 |
product_A | factor2 | 2 |
product_B | factor1 | 1 |
product_B | value | 4 |
product_C | factor1 | 2 |
product_C | value | 8 |
For a table as above, factors (e.g., product_A:factor1) are ingested into the table, and values are calculated based on other variables:
-
product_B:value=product_A:factor1+product_A:factor2+product_B:factor1
-
product_C:value=product_B:value*product_C:factor1
Based on the above information, create a reactive stateless engine:
// create output table
names = `product`metric`value
types = [STRING, STRING, DOUBLE]
share table(1:0, names, types) as outputTable
// define metrics
metrics = array(ANY, 0, 0)
metric1 = dict(STRING,ANY)
// product_B:factor2=product_A:factor1+product_A:factor2+product_B:factor1
metric1["outputName"] = `product_B:`value
metric1["formula"] = <A+B+C>
metric1["A"] = `product_A:`factor1
metric1["B"] = `product_A:`factor2
metric1["C"] = `product_B:`factor1
metrics.append!(metric1)
// product_C:value=product_B:value*product_C:factor1
metric2 = dict(STRING, ANY)
metric2["outputName"] =`product_C:`value
metric2["formula"] = <A*B>
metric2["A"] = `product_B:`value
metric2["B"] = `product_C:`factor1
metrics.append!(metric2)
// create engine
engine1 = createReactiveStatelessEngine("engine1", metrics, outputTable)
(1) Ingest 2 records to the engine. The dependent values cannot be calculated yet since the precedent variables are not enough. Empty results are returned.
insert into engine1 values(["product_A","product_A"],["factor1","factor2"],[1,2])
outputTable
product | metric | value |
---|---|---|
product_B | value | |
product_C | value |
(2) Then ingest 1 record for product_B:factor1, and the first dependent value can be calculated.
insert into engine1 values("product_B","factor1",1)
outputTable
product | metric | value |
product_B | value | |
product_C | value | |
product_B | value | 4 |
product_C | value |
(3) Ingest 1 record for product_C:factor1, and the second dependent value can be calculated.
insert into engine1 values("product_C","factor1",2)
outputTable
product | metric | value |
---|---|---|
product_B | value | |
product_C | value | |
product_B | value | 4 |
product_C | value | |
product_C | value | 8 |
(4) Ingest 1 record which updates product_C:factor1 from 2 to 3. One new result for product_C:value is calculated.
insert into engine1 values("product_C","factor1",3)
outputTable
product | metric | value |
---|---|---|
product_B | value | |
product_C | value | |
product_B | value | 4 |
product_C | value | |
product_C | value | 8 |
product_C | value | 12 |
(5) Once a record is updated, related calculation results are returned even if the results do not eventually change.
insert into engine1 values(["product_A","product_A"],["factor1","factor2"],[2,1])
outputTable
product | metric | value |
---|---|---|
product_B | value | |
product_C | value | |
product_B | value | 4 |
product_C | value | |
product_C | value | 8 |
product_C | value | 12 |
product_B | value | 4 |
product_C | value | 12 |