createAnomalyDetectionEngine
Syntax
createAnomalyDetectionEngine(name, metrics, dummyTable, outputTable, timeColumn, [keyColumn], [windowSize], [step], [garbageSize], [roundTime=true], [snapshotDir], [snapshotIntervalInMsgCount], [raftGroup])
Details
This function creates an anomaly detection engine and returns a table object. Data inserted into this table is used to calculate specified metrics to detect anomalies.
Note that the following functions are not supported in metrics: next
, talibNull
, linearTimeTrend
, iterate
, or aggregate function nested within an order-sensitive function (e.g., tmsum(sum())
).
For more application scenarios, see Streaming Engines.
Calculation Rules
The anomaly detection engine uses different calculation rules for the following 3 types of anomaly metrics.
Comparison between a column and a constant or between columns. Non-aggregate functions may be used but aggregate function are not used. For examples:
qty < 4, qty > price
,lt(qty, prev(qty))
,isNull(qty) == false
, etc. For these metrics, the engine conducts calculations for each row and determines whether to output the result.Comparison between aggregate function result and a constant or between aggregate function results. Non-aggregate functions may be used, but their arguments may only include aggregate functions and/or constants, not columns. For examples:
avg(qty - price) > 10
,percentile(qty, 90) < 100
,max(qty) < avg(qty) * 2
,le(sum(qty), 5)
, etc. For these metrics, the engine conducts calculations at frequencies determined by the parameter step and determines whether to output the result.Comparison between aggregate function result and a column, or non-aggregate functions are used and their arguments include aggregate functions and columns. For examples:
avg(qty) > qty
,le(med(qty), price)
, etc. For these metrics, the engine conducts calculations at frequencies determined by the parameter step and determines whether to output the result.
Note: If an aggregate function is used in metrics, the parameters windowSize and step must be specified. The anomaly metrics are calculated in a window of windowSize at every step. To facilitate observation and comparison of calculation results, the engine automatically adjusts the starting point of the first window. See createTimeSeriesEngine for the alignment rules.
If the parameter keyColumn is specified, the anomaly metrics are calculated in each group.
Other Features:
Data cleanup: If an aggregate function or order-sensitive function is specified for metrics, the engine keeps the historical data. You can specify the parameter garbageSize to clean up the data that are no longer needed. If keyColumn is specified, data cleanup is performed within each group independently.
Snapshot: Snapshot mechanism is used to restore the streaming engine to the latest snapshot after system interruption. (See parameters snapshotDir and snapshotIntervalInMsgCount)
High Availability of Streaming Engines: To enable high availability on the streaming engines, specify the parameter raftGroup on the leader node of the raft group on the subscriber. When a leader is down, the raft group automatically switches to a new leader to resubscribe to the stream table.
Arguments
name is a string indicating the name of the anomaly detection engine. It is the unique identifier of the engine on a data/compute node. It can contain letters, numbers and underscores, and must start with a letter.
metrics is metacode or tuple specifying the formulas for anomaly detection. It uses functions or expressions such as <[sum(qty)>5, avg(qty)>qty, qty<4]>
that indicate anomaly conditions and return Boolean values.
Note: The column names specified in metrics are not case-sensitive and can be inconsistent with the column names of the input tables. However, if there are multiple identical column names specified in metrics, their cases must be consistent.
dummyTable is a table object whose schema must be the same as the stream table to which the engine subscribes.
outputTable is the output table of detected anomalies. It can be an in-memory table or a DFS table. Create an empty table before calling the function.
The output columns are in the following order:
(1) The first column is of temporal data type and stores the time when an anomaly is detected.
(2) If keyColumn is specified, the following column(s) are keyColumn.
(3) Then followed by column “type” (of INT type) indicating the position of the condition in metrics;
(4) And column “metric” (of STRING/SYMBOL type) indicating the content of the anomaly conditions.
timeColumn is a string indicating the name of the temporal column of the input stream table.
keyColumn (optional) is a STRING scalar or vector indicating the grouping columns. The anomaly detection engine conducts calculations within each group specified by keyColumn.
windowSize (optional) is a positive integer indicating the size of window for calculation. Only the left boundary is included in the window.
step (optional) is a positive integer indicating the duration between 2 adjacent windows. The value of windowSize must be a multiple of step.
Note: If aggregate functions are used in metrics, parameters windowSize and step must be specified.
garbageSize (optional) is a positive integer. The default value is 2,000 (in Bytes).
If keyColumn is not specified, when the number of historical records in memory exceeds garbageSize, the system will clear the records that are no longer needed.
If keyColumn is specified, garbage collection is conducted separately in each group. When the number of historical records in a group exceeds garbageSize, the system will clear the records that are no longer needed within the group.
Note that garbageSize only takes effect when aggregate function(s) are specified in parameter metrics.
roundTime (optional) is a Boolean value indicating the method to align the window boundary if the time column is in millisecond or second precision and step is greater than one minute. The default value is true indicating the alignment is based on the multi-minute rule. False means alignment is based on the one-minute rule (See Alignment Rules).
To enable snapshot in the streaming engines, specify parameters snapshotDir and snapshotIntervalInMsgCount.
snapshotDir (optional) is a string indicating the directory where the streaming engine snapshot is saved.
The directory must already exist, otherwise an exception is thrown.
If the snapshotDir is specified, the system checks whether a snapshot already exists in the directory when creating a streaming engine. If it exists, the snapshot will be loaded to restore the engine state.
Multiple streaming engines can share a directory where the snapshot files are named as the engine names.
The file extension of a snapshot can be:
<engineName>.tmp: a temporary snapshot
<engineName>.snapshot: a snapshot that is generated and flushed to disk
<engineName>.old: if a snapshot with the same name already exists, the previous snapshot is renamed to <engineName>.old.
snapshotIntervalInMsgCount (optional) is a positive integer indicating the number of messages to receive before the next snapshot is saved.
raftGroup (optional) is an integer greater than 1, indicating the ID of the raft group on the high-availability streaming subscriber specified by the configuration parameter streamingRaftGroups. Specify raftGroup to enable high availability on the streaming engine. When an engine is created on the leader, it is also created on each follower and the engine snapshot is synchronized to the followers. When a leader is down, the raft group automatically switches to a new leader to resubscribe to the stream table. Note that snapshotDir must also be specified when specifying a raft group.
Examples
The following example creates a table “engine” with createAnomalyDetectionEngine
, then subscribes to the stream table trades and writes data to the engine. The engine conducts calculations by group based on column sym and saves the results to outputTable.
$ share streamTable(1000:0, `time`sym`qty, [TIMESTAMP, SYMBOL, INT]) as trades
$ outputTable = table(1000:0, `time`sym`type`metric, [TIMESTAMP, SYMBOL, INT, STRING])
$ engine = createAnomalyDetectionEngine(name="anomalyDetection1", metrics=<[sum(qty) > 5, avg(qty) > qty, qty < 4]>, dummyTable=trades, outputTable=outputTable, timeColumn=`time, keyColumn=`sym, windowSize=3, step=3)
$ subscribeTable(tableName="trades", actionName="anomalyDetectionSub1", offset=0, handler=append!{engine}, msgAsTable=true)
$ def writeData(n){
$ timev = 2018.10.08T01:01:01.001 + 1..n
$ symv =take(`A`B, n)
$ qtyv = n..1
$ insert into trades values(timev, symv, qtyv)
$ }
$ writeData(6);
$ select * from trades;
time |
sym |
qty |
---|---|---|
2018.10.08T01:01:01.002 |
A |
6 |
2018.10.08T01:01:01.003 |
B |
5 |
2018.10.08T01:01:01.004 |
A |
4 |
2018.10.08T01:01:01.005 |
B |
3 |
2018.10.08T01:01:01.006 |
A |
2 |
2018.10.08T01:01:01.007 |
B |
1 |
$ select * from outputTable;
time |
sym |
type |
metric |
---|---|---|---|
2018.10.08T01:01:01.003 |
A |
0 |
sum(qty) > 5 |
2018.10.08T01:01:01.004 |
A |
1 |
avg(qty) > qty |
2018.10.08T01:01:01.005 |
B |
2 |
qty < 4 |
2018.10.08T01:01:01.006 |
A |
1 |
avg(qty) > qty |
2018.10.08T01:01:01.006 |
A |
2 |
qty < 4 |
2018.10.08T01:01:01.006 |
B |
0 |
sum(qty) > 5 |
2018.10.08T01:01:01.007 |
B |
1 |
avg(qty) > qty |
2018.10.08T01:01:01.007 |
B |
2 |
qty < 4 |
The calculation process of the anomaly detection engine is explained in details below:
(1) The indicator sum(qty)>5
represents the comparison between the aggregate result and a constant, the anomaly detection engine checks this indicator during the calculation in each window.
The first window ranges from 2018.10.08T01:01:01.000 to 2018.10.08T01:01:01.002, and the sum(qty) of A and B is calculated respectively. At 2018.10.08T01:01:01.003 the engine starts to judge whether it meets the condition sum(qty)>5.
The second window ranges from 2018.10.08T01:01:01.003 to 2018.10.08T01:01:01.005, at 2018.10.08T01:01:01.006, the engine starts to judge whether it meets the condition sum(qty)>5, and so on.
(2) The indicator avg(qty)>qty
represents the comparison between the aggregate result and a certain column. Therefore, whenever data arrives, the anomaly detection engine compares the data with the aggregate result of the previous window. Until the next aggregation is triggered, the engine checks whether the result meets the conditions and outputs it.
The first window ranges from 2018.10.08T01:01:01.000 to 2018.10.08T01:01:01.002, the avg(qty) of A and B is calculated respectively. Each qty between 2018.10.08T01:01:01.003 and 2018.10.08T01:01:01.005 will be compared with the avg(qty) of the previous window.
The window moves at 2018.10.08T01:01:01.005.
The second window ranges from 2018.10.08T01:01:01.003 to 2018.10.08T01:01:01.005. In the second window, the avg(qty) of A and B is calculated, each qty between 2018.10.08T01:01:01.006 and 2018.10.08T01:01:01.008 will be compared with the avg(qty) of the previous window, and so on.
(3) The metric qty < 4
represents the comparison between a column and a constant. Therefore, whenever a new record arrives, the anomaly detection engine compares the value with 4.