createRuleEngine
Syntax
createRuleEngine(name, ruleSets, dummyTable, outputColumns, outputTable,
[policy], [ruleSetColumn], [callback], [snapshotDir],
[snapshotIntervalInMsgCount])
Details
Create a rule engine that supports multiple rules.
Workflow
The rule engine selects and applies the appropriate rules to inserted data based on a predefined rule set, then outputs the check result. The rule-matching and checking logic works as follows:
- The rule set is a dictionary in which each key corresponds to a group of rules. A NULL key indicates the default rule group.
- When data enters the engine, the value of its key field (specified by ruleSetColumn) is matched against the keys in the rule set. If the field value equals a key, that rule group is used to check the data. If ruleSetColumn is not specified or no rule is matched, the default rule group is used.
- The check result is determined by the specific rules and the configured policy. For details on policies, refer to the Arguments section.
- If a callback function is defined, the engine calls it for further data processing.
Scenarios
The rule engine supports millisecond-level real-time responses, precise handling of large-scale time-series data, and dynamic rule adjustments. It can be applied in scenarios such as:
- IoT: device monitoring (e.g., temperature, humidity), power monitoring (e.g., voltage, electricity usage).
- Finance: risk control, such as filtering orders, monitoring stock trading volume, and setting over-limit alerts.
Arguments
name is a string indicating the name of the engine.
ruleSets is a dictionary specifying the rule set. Its key is of STRING or INT type, and the value is a tuple with metacode. If the key is NULL, the rule is treated as the default rule. If ruleSetColumn is not specified or the data does not match any rule, the default rule will be used for checking data. A rule set must include a default rule.
dummyTable is a table object whose schema must be the same as the input stream table. Whether dummyTable contains data does not matter.
outputColumns is a STRING vector indicating the input columns to be preserved in the output table.
outputTable is a table to which the engine inserts the check result. It can be an in-memory table or a DFS table. It includes the columns specified by outputColumns, along with an additional column that stores the check result. When the policy parameter is set to "shortcut", the last column is an INT scalar; otherwise, it is a BOOL vector.
policy (optional) is a STRING scalar indicating the rule-checking policy. It can take the following values:
- shortcut (default): When any rule fails (i.e., the check result is false),
the rule engine immediately stops checking and returns the index of that
rule (starting from 0). If all rules pass, it returns NULL. For example, if
the data matches the following rules and the value of the age field is less
than 18, the check result is false. The rule engine stops checking and
returns 0 (the rule’s index) to the output
table.
(< age > 18 >,< income > 5000 >,< debtRatio < 0.5 >) - all: Check all rules, and the engine returns the check result as a BOOL
vector. For example, if the data matches the following rules and
voltage = 220,current = 37.11, andtemperature = 39, the check result would be[false, false, false].(< voltage > 221.86 >,< current > 39.96 >,< temperature > 44.43 >)
ruleSetColumn (optional) is a STRING scalar indicating an input column name. If it is not set or the specified column does not match any rule set, then the default rule set is applied.
callback (optional) is a function that takes a tuple as input. The table contains a record output by the engine. If specified, the callback function is invoked with each output passed in as an argument. If not specified, the engine will only insert the checking results into the output table.
snapshotDir and snapshotIntervalInMsgCount (optional) are not required. They have no effect and are included solely for compatibility with the ORCA framework.
Examples
Implementation steps
- Create a DFS table to store the data output by the callback function.
- Define the rule set that will be used to check inserted data.
- Define the callback function to further process the data based on the check result.
- Create the rule engine.
- Insert data into the rule engine.
Sample code
Create a DFS table
// Create a DFS table to store the data output by the callback function
db = database("dfs://temp", VALUE, 1..3)
t1 = table(1:0, `sym`value`price, [INT,DOUBLE,DOUBLE])
pt = db.createPartitionedTable(t1,`pt,`sym)
Define the rule set
The following rule set contains three groups of rules, with keys 1, 2, and NULL. The NULL key represents the default rule group.
x = [1, 2, NULL]
y = [ [ < value > 1 > ], [ < price < 2 >, < price > 6 > ], [ < value*price > 10 > ] ]
ruleSets = dict(x, y)
/* Output:
->(< (value * price) > 10 >)
2->(< price < 2 >,< price > 6 >)
1->(< value > 1 >)
*/
Define the callback function
The following callback function checks whether the first value in the check
result of each row is false. If it is, the corresponding data
is inserted into the DFS table.
def writeBack(result){
if(result.rule[0]==false){
temp = select sym,value,price from result
loadTable("dfs://temp",`pt).append!(temp)
}
}
Create the rule engine
names = `sym`value`price`quantity
types = [INT, DOUBLE, DOUBLE, DOUBLE]
dummy = table(1:0, names, types)
outputNames = `sym`value`price`rule
outputTypes = [INT, DOUBLE, DOUBLE, BOOL[]]
outputTable = table(10:0, outputNames, outputTypes)
test = createRuleEngine(
name="ruleEngineTest",
ruleSets=ruleSets,
dummyTable=dummy,
outputColumns=["sym","value","price"],
outputTable=outputTable,
policy="all",
ruleSetColumn="sym",
callback=writeBack)
Insert data into the rule engine
Insert two rows with sym = 1. These rows will be checked using
the rule corresponding to key = 1 in the rule set, namely
value > 1.
test.append!(table(1 as sym, 0 as value, 2 as price, 3 as quantity))
test.append!(table(1 as sym, 2 as value, 2 as price, 3 as quantity))
Insert three rows with sym = 2. These rows will be checked using
the rules corresponding to key = 2 in the rule set, namely
price < 2 and price > 6.
test.append!(table(2 as sym, 2 as value, 0 as price, 3 as quantity))
test.append!(table(2 as sym, 2 as value, 4 as price, 3 as quantity))
test.append!(table(2 as sym, 2 as value, 8 as price, 3 as quantity))
Insert two rows with sym = 3. Since the rule set only contains
rules for keys 1 and 2, these rows will be checked using the rule with
key = NULL in the rule set, namely value * price >
10.
test.append!(table(3 as sym, 2 as value, 3 as price, 3 as quantity))
test.append!(table(3 as sym, 2 as value, 6 as price, 3 as quantity))
View data in the outputTable.
select * from outputTable
| sym | value | price | rule |
|---|---|---|---|
| 1 | 0 | 2 | [false] |
| 1 | 2 | 2 | [true] |
| 2 | 2 | 0 | [true,false] |
| 2 | 2 | 4 | [false,false] |
| 2 | 2 | 8 | [false,true] |
| 3 | 2 | 3 | [false] |
| 3 | 2 | 6 | [true] |
After processing by the callback function, all rows whose first check result is
false have been written to the DFS table
dfs://temp/pt.
select * from loadTable("dfs://temp","pt")
| sym | value | price |
|---|---|---|
| 1 | 0 | 2 |
| 2 | 2 | 4 |
| 2 | 2 | 8 |
| 3 | 2 | 3 |
Related functions: updateRule, deleteRule, getRules
