DStream::anomalyDetectionEngine
Syntax
DStream::anomalyDetectionEngine(metrics, timeColumn, [keyColumn],
[windowSize], [step], [garbageSize], [roundTime=true],
[anomalyDescription])
Details
Creates an anomaly detection engine. For details, see createAnomalyDetectionEngine.
Return value: A DStream object.
Arguments
metrics is metacode or tuple specifying the formulas for
anomaly detection. It uses functions or expressions such as
<[sum(qty)>5, avg(qty)>qty, qty<4]>
that
indicate anomaly conditions and return Boolean values.
Note: The column names specified in metrics are not case-sensitive and can be inconsistent with the column names of the input tables.
timeColumn is a string indicating the name of the temporal column of the input stream table.
keyColumn (optional) is a STRING scalar or vector indicating the grouping columns. The anomaly detection engine conducts calculations within each group specified by keyColumn.
windowSize (optional) is a positive integer indicating the size of window for calculation. Only the left boundary is included in the window.
step (optional) is a positive integer indicating the duration between 2 adjacent windows. The value of windowSize must be a multiple of step.
Note: If aggregate functions are used in metrics, parameters windowSize and step must be specified.
-
If keyColumn is not specified, when the number of historical records in memory exceeds garbageSize, the system will clear the records that are no longer needed.
-
If keyColumn is specified, garbage collection is conducted separately in each group. When the number of historical records in a group exceeds garbageSize, the system will clear the records that are no longer needed within the group.
roundTime (optional) is a Boolean value indicating the method to align the window boundary if the time column is in millisecond or second precision and step is greater than one minute. The default value is true indicating the alignment is based on the multi-minute rule. False means alignment is based on the one-minute rule (See Alignment Rules).
anomalyDescription (optional) is a STRING vector, where each element indicates a custom description for the corresponding anomaly condition specified in metrics.
Examples
if (!existsCatalog("orca")) {
createCatalog("orca")
}
go
use catalog orca
adGraph = createStreamGraph("anomalyDetection")
adGraph.source("trade", 1000:0, `time`sym`qty, [TIMESTAMP, SYMBOL, INT])
.anomalyDetectionEngine(metrics=<[sum(qty) > 5, avg(qty) > qty, qty < 4]>, timeColumn=`time, keyColumn=`sym, windowSize=3, step=3)
.sink("anomal_output")
adGraph.submit()
go
times=2024.10.08T01:01:01.001 + 1..6
syms=["A", "B", "A", "B", "A", "B"]
qtys=[6, 5, 4, 3, 2, 1]
tmp=table(times as timestamp, syms as sym, qtys as qty)
appendOrcaStreamTable("trade", tmp)
select * from orca_table.anomal_output
time | sym | type | metric |
---|---|---|---|
2024.10.08T01:01:01.003 | A | 0 | sum(qty) > 5 |
2024.10.08T01:01:01.004 | A | 1 | avg(qty) > qty |
2024.10.08T01:01:01.005 | B | 2 | qty < 4 |
2024.10.08T01:01:01.006 | A | 1 | avg(qty) > qty |
2024.10.08T01:01:01.006 | A | 2 | qty < 4 |
2024.10.08T01:01:01.006 | B | 0 | sum(qty) > 5 |
2024.10.08T01:01:01.007 | B | 1 | avg(qty) > qty |
2024.10.08T01:01:01.007 | B | 2 | qty < 4 |