createRuleEngine

语法

createRuleEngine(name, ruleSets, dummyTable, outputColumns, outputTable, [policy], [ruleSetColumn], [callback], [snapshotDir], [snapshotIntervalInMsgCount])

详情

createRuleEngine 用于创建一个规则引擎。

注:
不支持乱序处理机制,用户需要自行保证输入的数据有序。

工作流程

规则引擎能够根据预先定义的规则集,在数据流进入引擎时选择匹配的规则来检查数据,并输出检查结果。规则匹配和检查逻辑如下:

  1. 规则集(ruleSets)是一个字典,其中每一个 key 对应一组规则。key 为 NULL 时代表此规则为默认规则。

  2. 数据流进入引擎时,数据的关键字段(由 ruleSetColumn 指定)与规则集的 key 进行匹配。字段值等于 key 时,代表数据命中规则,将采用此规则检查数据。如果未指定 ruleSetColumn,或数据未命中任何规则,将采用默认规则检查数据。

  3. 检查结果由具体的规则和检查策略(policy)决定。有关 policy 的解释请阅读参数章节。

  4. 如果定义了回调函数,将调用此函数进一步处理数据。

应用场景

规则引擎支持毫秒级实时响应、精准处理大规模时序数据以及动态规则调整,可应用于如下场景:

  • 物联网:设备监测,例如监测设备的温度、湿度;电力监控,例如监控电压、用电量。

  • 金融:风控场景,例如过滤订单、监控股票成交量、设置超量预警信号等。

参数

name 是一个字符串,表示引擎名。

ruleSets 是一个字典,表示规则集。字典的 key 是 STRING 或 INT 类型,value 是一个包含元代码的元组。如果 key 为NULL,表示此条为默认规则。如果未指定 ruleSetColumn,或数据未命中任何规则,将采用默认规则检查数据。一个规则集必须包含默认规则。

dummyTable 一个表对象,和输入的流数据表的 schema 一致,可以含有数据,亦可为空表。

outputColumns 一个 STRING 类型向量,表示输入表中需要保留到输出表的列。

outputTable 是一个表对象,表示输出表,可以是内存表或者分布式表,包含 outputColumns 指示的列,以及一列规则检查的结果列。当参数 policy 设置为 "shortcut" 时,最后一列为 INT 类型的标量;否则,为 BOOL 类型的向量。

policy 是一个字符串标量,表示规则检查策略。可取以下值:
  • "shortcut" 是默认值,代表短路逻辑。当检查到任一规则不符合(计算结果为 false)时,规则引擎立即停止检查并返回该规则的 index(从 0 开始);如果所有规则都符合,则返回 NULL。比如数据匹配到如下规则,当 age 字段的值小于 18 时,计算结果为 false,规则引擎停止检查并返回 0 (规则的 index)到输出表中。

    (< age > 18 >,< income > 5000 >,< debtRatio < 0.5 >)
  • "all" 代表检查全部规则,返回规则集的检查结果,布尔类型。比如数据匹配到如下规则,当 voltage=220current=37.11temperature=39,检查结果为 [false, false, false]

    (< voltage > 221.86 >,< current > 39.96 >,< temperature > 44.43 >)

ruleSetColumn 是一个 STRING 类型标量,为输入表的某一列名,如果没有定义或者输入数据中的该列数据没有命中任何一个规则集,则使用默认规则,即规则集中 key 为 NULL 时所指示的规则。

callback 是一个函数,其参数为一个元组,是引擎输出的一行。若设置此参数,引擎每处理一行,在将引擎处理结果输出到输出表的同时,会将该结果作为入参调用此函数。

snapshotDirsnapshotIntervalInMsgCount 可选参数,无需配置,实际不生效,仅用于兼容 ORCA 框架。

返回值

返回一个表对象。

例子

实现步骤

  1. 创建分布式表,存放回调函数输出的数据。

  2. 定义规则集,用于检查输入的数据。

  3. 定义回调函数,根据检查结果进一步处理数据。

  4. 创建规则引擎。

  5. 向规则引擎输入数据。

示例代码

创建分布式表

// 创建分布式表,存放回调函数输出的数据
db = database("dfs://temp", VALUE, 1..3)
t1 = table(1:0, `sym`value`price, [INT,DOUBLE,DOUBLE])
pt = db.createPartitionedTable(t1,`pt,`sym)

定义规则集

如下规则集包含三组规则,规则的 key 分别为 1、2、NULL,其中 NULL 代表默认规则。

x = [1, 2, NULL]
y = [ [ < value > 1 > ], [ < price < 2 >, < price > 6 > ], [ < value*price > 10 > ] ]
ruleSets = dict(x, y)
/* 输出:
->(< (value * price) > 10 >)
2->(< price < 2 >,< price > 6 >)
1->(< value > 1 >)
*/

定义回调函数

如下回调函数用于判断引擎每一行的检查结果中首个值是否为 false,如是,则将对应数据插入分布式表。

def writeBack(result){
    if(result.rule[0]==false){
        temp = select sym,value,price from result
        loadTable("dfs://temp",`pt).append!(temp)
    }
}

创建规则引擎

names = `sym`value`price`quantity
types = [INT, DOUBLE, DOUBLE, DOUBLE]
dummy = table(1:0, names, types)
outputNames = `sym`value`price`rule
outputTypes = [INT, DOUBLE, DOUBLE, BOOL[]]
outputTable = table(10:0, outputNames, outputTypes)
test = createRuleEngine(
        name="ruleEngineTest",
        ruleSets=ruleSets,
        dummyTable=dummy,
        outputColumns=["sym","value","price"],
        outputTable=outputTable,
        policy="all",
        ruleSetColumn="sym",
        callback=writeBack)

向规则引擎输入数据

插入两条 sym=1 的数据,此时会根据规则集 ruleSetskey=1 对应的规则,即 value >1 来检查这两条数据。

test.append!(table(1 as sym, 0 as value, 2 as price, 3 as quantity))
test.append!(table(1 as sym, 2 as value, 2 as price, 3 as quantity))

插入三条 sym=2 的数据,此时会根据规则集 ruleSetskey=2 对应的规则,即 price < 2price > 6 来检查这三条数据。

test.append!(table(2 as sym, 2 as value, 0 as price, 3 as quantity))
test.append!(table(2 as sym, 2 as value, 4 as price, 3 as quantity))
test.append!(table(2 as sym, 2 as value, 8 as price, 3 as quantity))

插入两条 sym=3 的数据,由于规则集 ruleSets 只设置了键值为 1 和 2 的规则,此时会根据规则集中 key=NULL 对应的规则,即 value*price > 10 来检查这两条数据。

test.append!(table(3 as sym, 2 as value, 3 as price, 3 as quantity))
test.append!(table(3 as sym, 2 as value, 6 as price, 3 as quantity))

查看输出表 outputTable 的内容。

select * from outputTable
sym value price rule
1 0 2 [false]
1 2 2 [true]
2 2 0 [true,false]
2 2 4 [false,false]
2 2 8 [false,true]
3 2 3 [false]
3 2 6 [true]

回调函数处理后,所有检查结果中首个值为false的行均被写入了分布式表 dfs://temp/pt。

select * from loadTable("dfs://temp","pt")
sym value price
1 0 2
2 2 4
2 2 8
3 2 3

相关函数:updateRuledeleteRulegetRules