DStream::ruleEngine

语法

DStream::ruleEngine(ruleSets, outputColumns, [policy], [ruleSetColumn], [callback])

详情

创建流计算规则引擎。参考:createRuleEngine

返回值:一个 DStream 对象。

参数

例子

if (!existsCatalog("orca")) {
	createCatalog("orca")
}
go
use catalog orca

// 如已存在流图,则先销毁该流图
// dropStreamGraph('engine')
g = createStreamGraph('engine')

// 设置规则集
x = [1, 2, NULL]
y = [ [ < value > 1 > ], [ < price < 2 >, < price > 6 > ], [ < value*price > 10 > ] ]
ruleSets = dict(x, y)

// 创建分布式表,用于回调函数写入数据
if(existsDatabase("dfs://temp")){
    dropDatabase("dfs://temp")
}
db = database("dfs://temp", VALUE, 1..3)
t1 = table(1:0, `sym`value`price, [INT,DOUBLE,DOUBLE])
pt = db.createPartitionedTable(t1,`pt,`sym)

// 创建回调函数,根据检测结果,将数据写入分布式表
def writeBack(result){
    if(result.rule[0]==false){
        temp = select sym,value,price from result
        loadTable("dfs://temp",`pt).append!(temp)
    }
}

g.source("trades", 1000:0, `sym`value`price`quantity, [INT, DOUBLE, DOUBLE, DOUBLE])
.ruleEngine(ruleSets=ruleSets, outputColumns=["sym","value","price"], policy="all", ruleSetColumn="sym", callback=writeBack)
.sink("output")
g.submit()
go

tmp=table(1 1 as sym, 0 2 as value, 2 2 as price, 3 3 as quantity)
appendOrcaStreamTable("trades", tmp)

select * from orca_table.output
sym value price rule
1 0 2 [false]
1 2 2 [true]