DStream::ruleEngine

Syntax

DStream::ruleEngine(ruleSets, outputColumns, [policy], [ruleSetColumn], [callback])

Details

Creates a rule engine that supports multiple rule sets. For details, see createRuleEngine.

Return value: A DStream object.

Arguments

ruleSets is a dictionary specifying the rule set. Its key is of STRING or INT type, and the value is a tuple with metacode. If the key is NULL, the rule is treated as the default rule. If ruleSetColumn is not specified or the data does not match any rule, the default rule will be used for checking data. A rule set must include a default rule.

outputColumns is a STRING vector indicating the input columns to be preserved in the output table.

policy (optional) is a STRING scalar indicating the rule-checking policy. It can take the following values:

ruleSetColumn (optional) is a STRING scalar indicating an input column name. If it is not set or the specified column does not match any rule set, then the default rule set is applied.

callback (optional) is a function that takes a tuple as input. The table contains a record output by the engine. If specified, the callback function is invoked with each output passed in as an argument. If not specified, the engine will only insert the checking results into the output table.

Examples

if (!existsCatalog("orca")) {
	createCatalog("orca")
}
go
use catalog orca

// If a stream graph with the same name already exists, destroy it first.
// dropStreamGraph('engine')
g = createStreamGraph('engine')

// define rule sets
x = [1, 2, NULL]
y = [ [ < value > 1 > ], [ < price < 2 >, < price > 6 > ], [ < value*price > 10 > ] ]
ruleSets = dict(x, y)

// create a DFS table to write results to the callback function
if(existsDatabase("dfs://temp")){
    dropDatabase("dfs://temp")
}
db = database("dfs://temp", VALUE, 1..3)
t1 = table(1:0, `sym`value`price, [INT,DOUBLE,DOUBLE])
pt = db.createPartitionedTable(t1,`pt,`sym)

// create callback function
def writeBack(result){
    if(result.rule[0]==false){
        temp = select sym,value,price from result
        loadTable("dfs://temp",`pt).append!(temp)
    }
}

g.source("trades", 1000:0, `sym`value`price`quantity, [INT, DOUBLE, DOUBLE, DOUBLE])
.ruleEngine(ruleSets=ruleSets, outputColumns=["sym","value","price"], policy="all", ruleSetColumn="sym", callback=writeBack)
.sink("output")
g.submit()
go

tmp=table(1 1 as sym, 0 2 as value, 2 2 as price, 3 3 as quantity)
appendOrcaStreamTable("trades", tmp)

select * from orca_table.output
sym value price rule
1 0 2 [false]
1 2 2 [true]