createReactiveStatelessEngine

Syntax

createReactiveStatelessEngine(name, metrics, outputTable)

Details

This function creates a reactive stateless engine and returns a table object with the following schema:

Column Name Type
productName STRING
metricName STRING
value DOUBLE

Writing to the table means that data is ingested into the engine for calculation. The reactive stateless engine supports defining dependencies for dynamic calculations.

Calculation Rules

The reactive stateless engine processes each batch of input data and calculates formulas as defined in metrics. Once a precedent value (referred to by a metric) is ingested or updated, all dependent formulas are output accordingly. The calculation is based on the latest variable values.

Arguments

name is a string of the engine name. It is the only identifier of a reactive state engine on a data/compute node. It can have letter, number and "_" and must start with a letter.

metrics is a vector of dictionaries, specifying the calculation formulas and their dependencies. Each dictionary has the following key-value pairs:

  • "outputName"->productName:metricName

  • "formula"-><expression>

    The value of formula is a metacode expression defining the formula for calculation, which can reference other variables. For example, the expression can be <A*B>, and A and B are precedent variables for this formula.

  • Key-value pairs specifying the precedent variable locations used in the formula above. For exmaple, for <A*B>, specify:

    • "A"->productName:metricName
    • "B"->productName:metricName

    The productName and metricName uniquely specify the location of the variable, which can be the input or output table.

outputTable is the output table for the results. It can be an in-memory table or a DFS table. Create an empty table and specify the column names and types before calling the function.

The output columns are in the following order:

  • productName: STRING or SYMBOL type, as defined in metrics.
  • metricName: STRING or SYMBOL type, as defined in metrics.
  • A column of DOUBLE or FLOAT type storing calculation results of formula defined in metrics.

Examples

productName metricName value
product_A factor1 1
product_A factor2 2
product_B factor1 1
product_B value 4
product_C factor1 2
product_C value 8

For a table as above, factors (e.g., product_A:factor1) are ingested into the table, and values are calculated based on other variables:

  • product_B:value=product_A:factor1+product_A:factor2+product_B:factor1

  • product_C:value=product_B:value*product_C:factor1

Based on the above information, create a reactive stateless engine:

// create output table
names = `product`metric`value
types = [STRING, STRING, DOUBLE]
share table(1:0, names, types) as outputTable

// define metrics
metrics = array(ANY, 0, 0)
metric1 = dict(STRING,ANY)
// product_B:factor2=product_A:factor1+product_A:factor2+product_B:factor1
metric1["outputName"] = `product_B:`value
metric1["formula"] = <A+B+C>
metric1["A"] = `product_A:`factor1
metric1["B"] = `product_A:`factor2
metric1["C"] = `product_B:`factor1
metrics.append!(metric1)
// product_C:value=product_B:value*product_C:factor1
metric2 = dict(STRING, ANY)
metric2["outputName"] =`product_C:`value
metric2["formula"] = <A*B>
metric2["A"] = `product_B:`value 
metric2["B"] = `product_C:`factor1
metrics.append!(metric2)

// create engine
engine1 = createReactiveStatelessEngine("engine1", metrics, outputTable)

(1) Ingest 2 records to the engine. The dependent values cannot be calculated yet since the precedent variables are not enough. Empty results are returned.

insert into engine1 values(["product_A","product_A"],["factor1","factor2"],[1,2])
outputTable
product metric value
product_B value
product_C value

(2) Then ingest 1 record for product_B:factor1, and the first dependent value can be calculated.

insert into engine1 values("product_B","factor1",1)
outputTable
product metric value
product_B value
product_C value
product_B value 4
product_C value

(3) Ingest 1 record for product_C:factor1, and the second dependent value can be calculated.

insert into engine1 values("product_C","factor1",2)
outputTable
product metric value
product_B value
product_C value
product_B value 4
product_C value
product_C value 8

(4) Ingest 1 record which updates product_C:factor1 from 2 to 3. One new result for product_C:value is calculated.

insert into engine1 values("product_C","factor1",3)
outputTable
product metric value
product_B value
product_C value
product_B value 4
product_C value
product_C value 8
product_C value 12

(5) Once a record is updated, related calculation results are returned even if the results do not eventually change.

insert into engine1 values(["product_A","product_A"],["factor1","factor2"],[2,1])
outputTable
product metric value
product_B value
product_C value
product_B value 4
product_C value
product_C value 8
product_C value 12
product_B value 4
product_C value 12