DStream::ruleEngine
语法
DStream::ruleEngine(ruleSets, outputColumns, [policy], [ruleSetColumn],
[callback])
详情
创建流计算规则引擎。参考:createRuleEngine。
返回值:一个 DStream 对象。
参数
例子
if (!existsCatalog("orca")) {
createCatalog("orca")
}
go
use catalog orca
// 如已存在流图,则先销毁该流图
// dropStreamGraph('engine')
g = createStreamGraph('engine')
// 设置规则集
x = [1, 2, NULL]
y = [ [ < value > 1 > ], [ < price < 2 >, < price > 6 > ], [ < value*price > 10 > ] ]
ruleSets = dict(x, y)
// 创建分布式表,用于回调函数写入数据
if(existsDatabase("dfs://temp")){
dropDatabase("dfs://temp")
}
db = database("dfs://temp", VALUE, 1..3)
t1 = table(1:0, `sym`value`price, [INT,DOUBLE,DOUBLE])
pt = db.createPartitionedTable(t1,`pt,`sym)
// 创建回调函数,根据检测结果,将数据写入分布式表
def writeBack(result){
if(result.rule[0]==false){
temp = select sym,value,price from result
loadTable("dfs://temp",`pt).append!(temp)
}
}
g.source("trades", 1000:0, `sym`value`price`quantity, [INT, DOUBLE, DOUBLE, DOUBLE])
.ruleEngine(ruleSets=ruleSets, outputColumns=["sym","value","price"], policy="all", ruleSetColumn="sym", callback=writeBack)
.sink("output")
g.submit()
go
tmp=table(1 1 as sym, 0 2 as value, 2 2 as price, 3 3 as quantity)
appendOrcaStreamTable("trades", tmp)
select * from orca_table.output
| sym | value | price | rule |
|---|---|---|---|
| 1 | 0 | 2 | [false] |
| 1 | 2 | 2 | [true] |
