Rule Engine
The rule engine is a powerful stream processing tool provided by DolphinDB. Its core feature is “dynamically selecting and applying relevant business rules for logical judgment based on data characteristics.” When data flows into the engine, it selects matching rules from a predefined set based on a key field in the data (such as user ID, product type, transaction channel, etc.) to evaluate the data. The results (whether the data passes the rules) are output to a result table and can also trigger custom callback functions for immediate processing.
Rule engine features:
-
Millisecond-level real-time response
-
Accurate processing of large-scale time-series data
-
Online dynamic rule adjustment
Therefore, the rule engine is suitable for scenarios requiring high flexibility, real-time processing, and configurable business logic, such as:
-
IoT:Device monitoring, e.g., temperature and humidity; power monitoring, e.g., voltage and electricity usage.
-
Finance:Risk control, e.g., filtering orders according to specified rules, monitoring stock trading volumes, setting over-limit warning signals, etc.
The rule engine is created using the createRuleEngine function.
Syntax:
createRuleEngine(name, ruleSets, dummyTable, outputColumns, outputTable, [policy], [ruleSetColumn], [callback], [snapshotDir], [snapshotIntervalInMsgCount])
For more details, see createRuleEngine.
Rules in the engine can be dynamically adjusted using the updateRule and
deleteRule functions, and existing rules can be retrieved using
getRules:
updateRule(engineName, key, rules, [add=false])
deleteRule(engineName, key)
getRules([engineName])
For more details, see updateRule, deleteRule, and getRules.
Calculation Rules
The rule engine works as follows: first, a set of rules is defined using the ruleSets parameter in the form of a dictionary. The key indicates the rule identifier. If the key is NULL, the rule is treated as the default rule. The ruleSetColumn parameter specifies the field in the input data used to match rules. When the value of this field equals a key in ruleSets, the corresponding rule is considered matched. The engine then generates the calculation result according to the policy parameter. If the callback parameter is specified, the callback logic will also be triggered. If ruleSetColumn is not specified or no rule is matched, the default rule is applied.
Application Examples
Collect the current, voltage, and temperature of power equipment, and configure a rule set for monitoring. The collected data is stored in a stream table. The rule engine subscribes to the stream table to obtain real-time data, evaluates data, and outputs the results to another table.
Implementation steps:
-
Create a stream table to store the collected data.
share streamTable(1000:0,`time`pointId`voltage`current`temprature,[TIMESTAMP,STRING,DOUBLE,DOUBLE,DOUBLE]) as inputTable -
Create a stream table for data output by the rule engine and another stream table for data processed by the callback function defined in the rule engine.
// Create a stream table for data output by the rule engine share streamTable(1000:0,`time`pointId`voltage`current`temprature`inputTime`rule,[TIMESTAMP,STRING,DOUBLE,DOUBLE,DOUBLE,NANOTIMESTAMP,BOOL[]]) as outputTable // Create a stream table for data processed by the callback function defined in the rule engine share streamTable(1000:0,`time`pointId`voltage`current`temprature`inputTime`outputTime`comment,[TIMESTAMP,STRING,DOUBLE,DOUBLE,DOUBLE,NANOTIMESTAMP,NANOTIMESTAMP,STRING]) as resultTable -
Define the rule set.
n = 10 pointId=`ID + string(1..n) voltageHigh=round(double(rand(220..230,n))-rand(0.6,n),2) voltageLow=round(double(rand(45..50,n)) +rand(0.5,n),2) currentHigh=round(double(rand(40..45,n))-rand(0.6,n),2) currentLow=round(double(rand(7..9,n))+rand(0.5,n),2) tempratureHigh=round(double(rand(40..45,n))-rand(0.6,n),2) tempratureLow=round(double(rand(7..9,n))+rand(0.5,n),2) pt = table(pointId,voltageHigh,voltageLow,currentHigh,currentLow,tempratureHigh,tempratureLow) ids = exec pointId from pt ruleSet = dict(STRING,ANY) for(i in 0:pt.size()){ tmp = pt[i] keys = tmp.keys()[1:] value = tmp.values()[1:] a = array(STRING) for(j in 0:keys.size()){ if(keys[j] like "%High"){ s=strpos(keys[j],"High"); a.append!(keys[j][0:s]+ ">" + value[j] ) }else{ s=strpos(keys[j],"Low"); a.append!(keys[j][0:s] + "<" + value[j]) } } ruleSet[ids[i]] = parseExpr(a) } ruleSet[string(NULL)] = [ <voltage > 100000>] -
Define the callback function.
def writeBack(result){ outputTime = now(true) if(result.rule[0]==true){ s="High voltage" insert into resultTable values (result.time,result.pointId,result.voltage,result.current,result.temprature,result.inputTime,outputTime,s) } if(result.rule[1]==true){ s="Low voltage" insert into resultTable values (result.time,result.pointId,result.voltage,result.current,result.temprature,result.inputTime,outputTime,s) } if(result.rule[2]==true){ s="High current" insert into resultTable values (result.time,result.pointId,result.voltage,result.current,result.temprature,result.inputTime,outputTime,s) } if(result.rule[3]==true){ s="Low current" insert into resultTable values (result.time,result.pointId,result.voltage,result.current,result.temprature,result.inputTime,outputTime,s) } if(result.rule[4]==true){ s="High temperature" insert into resultTable values (result.time,result.pointId,result.voltage,result.current,result.temprature,result.inputTime,outputTime,s) } if(result.rule[5]==true){ s="Low temperature" insert into resultTable values (result.time,result.pointId,result.voltage,result.current,result.temprature,result.inputTime,outputTime,s) } } -
Create the rule engine.
colNames = inputTable.schema().colDefs.name join `inputTime colTypes = inputTable.schema().colDefs.typeString join `NANOTIMESTAMP schemaTable = table(1:0,colNames,colTypes) ruleEngine=createRuleEngine( name="ruleEngine", ruleSets=ruleSet, dummyTable=schemaTable, outputColumns=["time","pointId","voltage","current","temprature","inputTime"], outputTable=outputTable, policy="all", ruleSetColumn="pointId", callback=writeBack ) -
Subscribe to the stream table and insert 10 simulated records.
def handle(msg){ tmp = select *,now(true) as inputTime from msg getStreamEngine("ruleEngine").append!(tmp) } // Subscribe to the stream table subscribeTable( tableName="inputTable", actionName="RuleEngine", handler=handle, msgAsTable=true, offset=0 ) // Generate 10 simulated records n =10 t=table(n:n,`time`pointId`voltage`current`temprature,[TIMESTAMP,STRING,DOUBLE,DOUBLE,DOUBLE]) t["time"] = now() +( 1..10) *1000 t["pointId"]=`ID + string(1..n) t["voltage"]=round(double(rand(45..225,n))+rand(0.5,n) ,2) t["current"]=round(double(rand(7..43,n))+rand(0.5,n),2) t["temprature"]=round(double(rand(7..43,n))+rand(0.5,n),2) // Insert simulated records into the stream table inputTable.append!(t)
View the input table.
| time | pointId | voltage | current | temprature |
|---|---|---|---|---|
| 2025.11.09 21:09:24.883 | ID1 | 100.15 | 37.19 | 22.33 |
| 2025.11.09 21:09:25.883 | ID2 | 196.06 | 11.12 | 18.32 |
| 2025.11.09 21:09:26.883 | ID3 | 81.21 | 35.07 | 30.04 |
| 2025.11.09 21:09:27.883 | ID4 | 181.4 | 40.18 | 40.01 |
| 2025.11.09 21:09:28.883 | ID5 | 217.13 | 42.31 | 27.18 |
| 2025.11.09 21:09:29.883 | ID6 | 182.47 | 24.18 | 43.36 |
| 2025.11.09 21:09:30.883 | ID7 | 124.44 | 12.27 | 43.25 |
| 2025.11.09 21:09:31.883 | ID8 | 198.33 | 39.03 | 8.33 |
| 2025.11.09 21:09:32.883 | ID9 | 78.04 | 16.04 | 31.31 |
| 2025.11.09 21:09:33.883 | ID10 | 194.28 | 18.45 | 35.37 |
View the rule engine’s output table.
| time | pointId | voltage | current | temprature | inputTime | rule |
|---|---|---|---|---|---|---|
| 2025.11.09 21:09:24.883 | ID1 | 100.15 | 37.19 | 22.33 | 2025.11.09 21:09:23.886 | [false, false, false, false, false, false] |
| 2025.11.09 21:09:25.883 | ID2 | 196.06 | 11.12 | 18.32 | 2025.11.09 21:09:23.886 | [false, false, false, false, false, false] |
| 2025.11.09 21:09:26.883 | ID3 | 81.21 | 35.07 | 30.04 | 2025.11.09 21:09:23.886 | [false, false, false, false, false, false] |
| 2025.11.09 21:09:27.883 | ID4 | 181.4 | 40.18 | 40.01 | 2025.11.09 21:09:23.886 | [false, false, false, false, false, false] |
| 2025.11.09 21:09:28.883 | ID5 | 217.13 | 42.31 | 27.18 | 2025.11.09 21:09:23.886 | [false, false, true, false, false, false] |
| 2025.11.09 21:09:29.883 | ID6 | 182.47 | 24.18 | 43.36 | 2025.11.09 21:09:23.886 | [false, false, false, false, true, false] |
| 2025.11.09 21:09:30.883 | ID7 | 124.44 | 12.27 | 43.25 | 2025.11.09 21:09:23.886 | [false, false, false, false, true, false] |
| 2025.11.09 21:09:31.883 | ID8 | 198.33 | 39.03 | 8.33 | 2025.11.09 21:09:23.886 | [false, false, false, false, false, true] |
| 2025.11.09 21:09:32.883 | ID9 | 78.04 | 16.04 | 31.31 | 2025.11.09 21:09:23.886 | [false, false, false, false, false, false] |
| 2025.11.09 21:09:33.883 | ID10 | 194.28 | 18.45 | 35.37 | 2025.11.09 21:09:23.886 | [false, false, false, false, false, false] |
View the output table after callback processing.
| time | pointId | voltage | current | temprature | inputTime | outputTime | comment |
|---|---|---|---|---|---|---|---|
| 2025.11.09 21:09:28.883 | ID5 | 217.13 | 42.31 | 27.18 | 2025.11.09 21:09:23.886 | 2025.11.09 21:09:23.886 | High current |
| 2025.11.09 21:09:29.883 | ID6 | 182.47 | 24.18 | 43.36 | 2025.11.09 21:09:23.886 | 2025.11.09 21:09:23.887 | High temprature |
| 2025.11.09 21:09:30.883 | ID7 | 124.44 | 12.27 | 43.25 | 2025.11.09 21:09:23.886 | 2025.11.09 21:09:23.887 | High temprature |
| 2025.11.09 21:09:31.883 | ID8 | 198.33 | 39.03 | 8.33 | 2025.11.09 21:09:23.886 | 2025.11.09 21:09:23.887 | Low temprature |
Explanation for the results
The rule set for device 'ID5' is:
(< voltage > 221.86 >,< voltage < 45.16 >,< current > 39.96 >,< current < 7.47 >,< temprature > 44.43 >,< temprature < 8.03 >)
The input data matches the rule "current > 39.96." After callback processing, the output is: "High current."
The rule set for device 'ID6' is:
(< voltage > 221.89 >,< voltage < 49.05 >,< current > 43.42 >,< current < 7.06 >,< temprature > 41.98 >,< temprature < 7.418>)
The input data matches the rule "temperature > 41.98." After callback processing, the output is: "High temperature."
The rule set for device 'ID7' is:
(< voltage > 226.61 >,< voltage < 48.40 >,< current > 40.77 >,< current < 9.19 >,< temprature > 41.95 >,< temprature < 8.14 >)
The input data matches the rule "temperature > 41.95." After callback processing, the output is: "High temperature."
The rule set for device 'ID8' is:
(< voltage > 228.97 >,< voltage < 46.137 >,< current > 42.607 >,< current < 7.29 >,< temprature > 44.40 >,< temprature < 9.27 >)
The input data matches the rule "temperature < 9.27." After callback processing, the output is: "High temperature."
Dynamically adjust the rules.
// Adjust the rules
newRules = [ < voltage > 223.41 >,< voltage < 49.25 >,< current > 36.84 >,< current < 8.4 >,< temprature > 44.60 >,< temprature < 7.43 >]
updateRule("ruleEngine","ID1",newRules)
// Insert a simulated record
insert into inputTable values (now(), 'ID1',100.15,37.19,22.33)
In the record, the current is 37.19. Under the previous rules, no alert was triggered. After the update, the "current > 36.84" rule is matched, and a new record is added to the output table.
