createReactiveStatelessEngine
语法
createReactiveStatelessEngine(name, metrics, outputTable)
详情
很多时候,数据之间具有很强的依赖关系,某些数据可能依赖于其他数据的最新值。每当所依赖的数据有更新时,希望这些数据可以同步更新。应对此场景,可以使用响应式无状态引擎。
创建一个响应式无状态引擎。该引擎可以定义依赖关系,每当所依赖的数据注入,便会根据最新的所依赖的数据计算出最新结果,将其输出到输出表。
返回一个表对象,向该表写入数据意味着将数据注入引擎。
该表的结构如下
name | typeString |
---|---|
productName | STRING |
metricName | STRING |
value | DOUBLE |
计算规则
引擎每有一批数据注入,会根据参数 metrics 中定义的依赖关系,将任何直接依赖或间接依赖这批数据的数据输出,每次输出的条数等于直接或间接依赖这批数据的变量个数。即使这个变量的值没有改变,也会输出。
参数
name 字符串标量,表示引擎的名称,作为其在一个数据节点/计算节点上的唯一标识。可包含字母,数字和下划线,但必须以字母开头。
metrics 字典向量,其中的每一个元素都是一个字典,代表数据间的一个依赖关系,每个字典的结构如下:
- “outputName”->productName:metricName
- “formula“-><A*B>
- “A“->productName:metricName
- “B“->productName:metricName
其中,键值 outputName 对应的 productName 和 metricName 将分别作为输出表的第一列和第二列;键值 A 和 B 对应的 productName 和 metricName 唯一确定了依赖的数据所在位置,既可以是输入表的数据,也可以是输出表中的数据;键值 formula 对应的元代码,用于表示数据间的依赖关系,上例中 outputName = A * B。
outputTable 输出表,必须为以下结构:
- productName,可以为 STRING 或 SYMBOL 类型。
- metricName,可以为 STRING 或 SYMBOL 类型。
- 计算所得指标的值,即 metrics 中 formula 计算得到的结果,可以为 DOUBLE 或 FLOAT 类型。
例子
现有一个窄表,表中信息如下
productName | metricName | value |
---|---|---|
product_A | factor1 | 1 |
product_A | factor2 | 2 |
product_B | factor1 | 1 |
product_B | value | 4 |
product_C | factor1 | 2 |
product_C | value | 8 |
上表中,product_A:factor1, product_A:factor2, product_B:factor1, product_C:factor1 完全由外部输入决定;一些数据依赖其他数据的值 product_B:value=product_A:factor1+product_A:factor2+product_B:factor1,product_C:value=product_B:value*product_C:factor1
根据以上信息,通过以下脚本创建引擎
// 创建输出表
names = `product`metric`value
types = [STRING, STRING, DOUBLE]
outputTable = table(1:0, names, types)
// 创建 metrics 描述数据间的依赖关系
metrics = array(ANY, 0, 0)
metric1 = dict(STRING,ANY)
// 依赖关系 product_B:factor2=product_A:factor1+product_A:factor2+product_B:factor1
metric1["outputName"] = `product_B:`value
metric1["formula"] = <A+B+C>
metric1["A"] = `product_A:`factor1
metric1["B"] = `product_A:`factor2
metric1["C"] = `product_B:`factor1
metrics.append!(metric1)
// 依赖关系 product_C:value=product_B:value*product_C:factor1
metric2 = dict(STRING, ANY)
metric2["outputName"] =`product_C:`value
metric2["formula"] = <A*B>
metric2["A"] = `product_B:`value
metric2["B"] = `product_C:`factor1
metrics.append!(metric2)
// 创建引擎
engine1 = createReactiveStatelessEngine("engine1", metrics, outputTable)
每次插入数据,无论插入的数据量有多大,都只会返回一次。
第一次插入2条数据,此时,两个依赖关系所需的数据尚不完整,无法计算得到需要的结果,所以输出表中对应值为空。
insert into engine1 values(["product_A","product_A"],["factor1","factor2"],[1,2])
outputTable
product | metric | value |
---|---|---|
product_B | value | |
product_C | value |
第二次插入1条数据,此时,第一个依赖关系所需数据已经完整,故得到结果,第二个依赖关系所需数据仍不完整,对应值为空。
insert into engine1 values("product_B","factor1",1)
outputTable
product | metric | value |
---|---|---|
product_B | value | |
product_C | value | |
product_B | value | 4 |
product_C | value |
第三次插入1条数据,此时,第二个依赖关系所需数据完整,输出对应结果。
insert into engine1 values("product_C","factor1",2)
outputTable
product | metric | value |
---|---|---|
product_B | value | |
product_C | value | |
product_B | value | 4 |
product_C | value | |
product_C | value | 8 |
第四次插入1条数据,此时,由于数据被修改,依赖此数据的相关数据均会受到影响,并将更新后的结果输出。
insert into engine1 values("product_C","factor1",3)
outputTable
product | metric | value |
---|---|---|
product_B | value | |
product_C | value | |
product_B | value | 4 |
product_C | value | |
product_C | value | 8 |
product_C | value | 12 |
注意,数据更新后,即使依赖此数据的相关数据的值最终没有变化,也会将最新结果输出
insert into engine1 values(["product_A","product_A"],["factor1","factor2"],[2,1])
outputTable
product | metric | value |
---|---|---|
product_B | value | |
product_C | value | |
product_B | value | 4 |
product_C | value | |
product_C | value | 8 |
product_C | value | 12 |
product_B | value | 4 |
product_C | value | 12 |
kt = keyedTable(`product`metric, 1:0, `product`metric`value, [STRING, STRING, DOUBLE])