DStream::ruleEngine

Syntax

DStream::ruleEngine(ruleSets, outputColumns, [policy], [ruleSetColumn], [callback])

Details

Creates a rule engine that supports multiple rule sets. For details, see createRuleEngine.

Return value: A DStream object.

Arguments

ruleSets is a dictionary specifying the rule sets. Its key is of STRING or INT type, and value is a tuple with metacode. It must contain a default rule set with a key of null values.

outputColumns is a STRING vector indicating the input columns to be preserved in the output table.

policy (optional) is a STRING scalar indicating the checking policy of rule sets. It can take the following values:
  • shortcut (default): When any check result is false, the corresponding index (which starts from 0) of the rule set is returned. Otherwise NULL is returned.

  • all: Check all specified rules and return an array vector of BOOLEAN type, its elements are the checking results for each rule set.

ruleSetColumn (optional) is a STRING scalar indicating an input column name. If it is not set, or the specified column does not match any rule set, then the default rule set is applied.

callback (optional) is a function which takes a table as input. The table contains a record output by the engine. If specified, the callback function is invoked with each output passed in as an argument. If not specified, the engine will only insert the checking results into the output table.

Examples

if (!existsCatalog("orca")) {
	createCatalog("orca")
}
go
use catalog orca

// If a stream graph with the same name already exists, destroy it first.
// dropStreamGraph('engine')
g = createStreamGraph('engine')

// define rule sets
x = [1, 2, NULL]
y = [ [ < value > 1 > ], [ < price < 2 >, < price > 6 > ], [ < value*price > 10 > ] ]
ruleSets = dict(x, y)

// create a DFS table to write results to the callback function
if(existsDatabase("dfs://temp")){
    dropDatabase("dfs://temp")
}
db = database("dfs://temp", VALUE, 1..3)
t1 = table(1:0, `sym`value`price, [INT,DOUBLE,DOUBLE])
pt = db.createPartitionedTable(t1,`pt,`sym)

// create callback function
def writeBack(result){
    if(result.rule[0]==false){
        temp = select sym,value,price from result
        loadTable("dfs://temp",`pt).append!(temp)
    }
}

g.source("trades", 1000:0, `sym`value`price`quantity, [INT, DOUBLE, DOUBLE, DOUBLE])
.ruleEngine(ruleSets=ruleSets, outputColumns=["sym","value","price"], policy="all", ruleSetColumn="sym", callback=writeBack)
.sink("output")
g.submit()
go

tmp=table(1 1 as sym, 0 2 as value, 2 2 as price, 3 3 as quantity)
appendOrcaStreamTable("trades", tmp)

select * from orca_table.output
sym value price rule
1 0 2 [false]
1 2 2 [true]