createRuleEngine
语法
createRuleEngine(name, ruleSets, dummyTable, outputColumns, outputTable,
[policy], [ruleSetColumn], [callback], [snapshotDir],
[snapshotIntervalInMsgCount])
详情
createRuleEngine 用于创建一个规则引擎。
工作流程
规则引擎能够根据预先定义的规则集,在数据流进入引擎时选择匹配的规则来检查数据,并输出检查结果。规则匹配和检查逻辑如下:
-
规则集(ruleSets)是一个字典,其中每一个 key 对应一组规则。key 为 NULL 时代表此规则为默认规则。
-
数据流进入引擎时,数据的关键字段(由 ruleSetColumn 指定)与规则集的 key 进行匹配。字段值等于 key 时,代表数据命中规则,将采用此规则检查数据。如果未指定 ruleSetColumn,或数据未命中任何规则,将采用默认规则检查数据。
-
检查结果由具体的规则和检查策略(policy)决定。有关 policy 的解释请阅读参数章节。
-
如果定义了回调函数,将调用此函数进一步处理数据。
应用场景
规则引擎支持毫秒级实时响应、精准处理大规模时序数据以及动态规则调整,可应用于如下场景:
-
物联网:设备监测,例如监测设备的温度、湿度;电力监控,例如监控电压、用电量。
-
金融:风控场景,例如过滤订单、监控股票成交量、设置超量预警信号等。
参数
name 是一个字符串,表示引擎名。
ruleSets 是一个字典,表示规则集。字典的 key 是 STRING 或 INT 类型,value 是一个包含元代码的元组。如果 key 为NULL,表示此条为默认规则。如果未指定 ruleSetColumn,或数据未命中任何规则,将采用默认规则检查数据。一个规则集必须包含默认规则。
dummyTable 一个表对象,和输入的流数据表的 schema 一致,可以含有数据,亦可为空表。
outputColumns 一个 STRING 类型向量,表示输入表中需要保留到输出表的列。
outputTable 是一个表对象,表示输出表,可以是内存表或者分布式表,包含 outputColumns 指示的列,以及一列规则检查的结果列。当参数 policy 设置为 "shortcut" 时,最后一列为 INT 类型的标量;否则,为 BOOL 类型的向量。
-
"shortcut" 是默认值,代表短路逻辑。当检查到任一规则不符合(计算结果为 false)时,规则引擎立即停止检查并返回该规则的 index(从 0 开始);如果所有规则都符合,则返回 NULL。比如数据匹配到如下规则,当 age 字段的值小于 18 时,计算结果为 false,规则引擎停止检查并返回 0 (规则的 index)到输出表中。
(< age > 18 >,< income > 5000 >,< debtRatio < 0.5 >) -
"all" 代表检查全部规则,返回规则集的检查结果,布尔类型。比如数据匹配到如下规则,当
voltage=220、current=37.11、temperature=39,检查结果为[false, false, false]。(< voltage > 221.86 >,< current > 39.96 >,< temperature > 44.43 >)
ruleSetColumn 是一个 STRING 类型标量,为输入表的某一列名,如果没有定义或者输入数据中的该列数据没有命中任何一个规则集,则使用默认规则,即规则集中 key 为 NULL 时所指示的规则。
callback 是一个函数,其参数为一个元组,是引擎输出的一行。若设置此参数,引擎每处理一行,在将引擎处理结果输出到输出表的同时,会将该结果作为入参调用此函数。
snapshotDir 与 snapshotIntervalInMsgCount 可选参数,无需配置,实际不生效,仅用于兼容 ORCA 框架。
返回值
返回一个表对象。
例子
实现步骤
-
创建分布式表,存放回调函数输出的数据。
-
定义规则集,用于检查输入的数据。
-
定义回调函数,根据检查结果进一步处理数据。
-
创建规则引擎。
-
向规则引擎输入数据。
示例代码
创建分布式表
// 创建分布式表,存放回调函数输出的数据
db = database("dfs://temp", VALUE, 1..3)
t1 = table(1:0, `sym`value`price, [INT,DOUBLE,DOUBLE])
pt = db.createPartitionedTable(t1,`pt,`sym)
定义规则集
如下规则集包含三组规则,规则的 key 分别为 1、2、NULL,其中 NULL 代表默认规则。
x = [1, 2, NULL]
y = [ [ < value > 1 > ], [ < price < 2 >, < price > 6 > ], [ < value*price > 10 > ] ]
ruleSets = dict(x, y)
/* 输出:
->(< (value * price) > 10 >)
2->(< price < 2 >,< price > 6 >)
1->(< value > 1 >)
*/
定义回调函数
如下回调函数用于判断引擎每一行的检查结果中首个值是否为 false,如是,则将对应数据插入分布式表。
def writeBack(result){
if(result.rule[0]==false){
temp = select sym,value,price from result
loadTable("dfs://temp",`pt).append!(temp)
}
}
创建规则引擎
names = `sym`value`price`quantity
types = [INT, DOUBLE, DOUBLE, DOUBLE]
dummy = table(1:0, names, types)
outputNames = `sym`value`price`rule
outputTypes = [INT, DOUBLE, DOUBLE, BOOL[]]
outputTable = table(10:0, outputNames, outputTypes)
test = createRuleEngine(
name="ruleEngineTest",
ruleSets=ruleSets,
dummyTable=dummy,
outputColumns=["sym","value","price"],
outputTable=outputTable,
policy="all",
ruleSetColumn="sym",
callback=writeBack)
向规则引擎输入数据
插入两条 sym=1 的数据,此时会根据规则集 ruleSets 中 key=1
对应的规则,即 value >1 来检查这两条数据。
test.append!(table(1 as sym, 0 as value, 2 as price, 3 as quantity))
test.append!(table(1 as sym, 2 as value, 2 as price, 3 as quantity))
插入三条 sym=2 的数据,此时会根据规则集 ruleSets 中 key=2
对应的规则,即 price < 2 和 price > 6 来检查这三条数据。
test.append!(table(2 as sym, 2 as value, 0 as price, 3 as quantity))
test.append!(table(2 as sym, 2 as value, 4 as price, 3 as quantity))
test.append!(table(2 as sym, 2 as value, 8 as price, 3 as quantity))
插入两条 sym=3 的数据,由于规则集 ruleSets 只设置了键值为 1 和 2 的规则,此时会根据规则集中
key=NULL 对应的规则,即 value*price > 10
来检查这两条数据。
test.append!(table(3 as sym, 2 as value, 3 as price, 3 as quantity))
test.append!(table(3 as sym, 2 as value, 6 as price, 3 as quantity))
查看输出表 outputTable 的内容。
select * from outputTable
| sym | value | price | rule |
|---|---|---|---|
| 1 | 0 | 2 | [false] |
| 1 | 2 | 2 | [true] |
| 2 | 2 | 0 | [true,false] |
| 2 | 2 | 4 | [false,false] |
| 2 | 2 | 8 | [false,true] |
| 3 | 2 | 3 | [false] |
| 3 | 2 | 6 | [true] |
回调函数处理后,所有检查结果中首个值为false的行均被写入了分布式表 dfs://temp/pt。
select * from loadTable("dfs://temp","pt")
| sym | value | price |
|---|---|---|
| 1 | 0 | 2 |
| 2 | 2 | 4 |
| 2 | 2 | 8 |
| 3 | 2 | 3 |
相关函数:updateRule、deleteRule、getRules
