addMetrics
Syntax
addMetrics(engine, newMetrics, newMetricsSchema, [windowSize],
[fill])
Alias: extendMetrics
Arguments
engine is the abstract table object returned by a streaming engine function
such as createTimeSeriesEngine.
Note that addMetrics
cannot be used in createAnomalyDetectionEngine and createReactiveStateEngine.
newMetrics is metacode indicating the new metrics to be calculated by the streaming engine. The metacode can include one or more expressions, built-in or user-defined functions, or a constant scalar/vector. Note that the output column for a constant vector must be in array vector form.
newMetricsSchema is a table object specifying the column names and data types of the new metrics in the output table.
windowSize (optional) is a positive integer indicating the length of the windows for calculation for the new metrics. It is only used for the time-series engine. It must be one of the values of windowSize in the existing time-series engine. The default value is the first value of windowSize in the existing time-series engine.
fill (optional) is a vector/scalar indicating the filling method to deal with an empty window (in a group). It can be:
-
'none': no result
-
'null': output a NULL value.
-
'ffill': output the result in the last window.
-
specific value: output the specified value. Its type should be the same as metrics output's type.
fill could be a vector to specify different filling method for each metric. The size of the vector must be consistent with the number of elements specified in metrics. The element in vector cannot be 'none'.
Details
Dynamically add new measures to a streaming engine.
Examples
Calculate the sum of column x with a streaming engine:
share streamTable(10000:0,`time`id`x, [TIMESTAMP,SYMBOL,INT]) as t
output1 = table(10000:0, `time`sum_x, [TIMESTAMP,INT])
agg1 = createTimeSeriesEngine(name=`agg1, windowSize=100, step=50, metrics=<sum(x)>, dummyTable=t, outputTable=output1, timeColumn=`time)
subscribeTable(tableName="t", actionName="agg1", offset=0, handler=append!{agg1}, msgAsTable=true)
n=500
time=2019.01.01T00:00:00.000+(1..n)
id=take(`ABC`DEF, n)
x=1..n
insert into t values(time, id, x);
select * from output1;
time | sum_x |
---|---|
2019.01.01T00:00:00.050 | 1,225 |
2019.01.01T00:00:00.100 | 4,950 |
2019.01.01T00:00:00.150 | 9,950 |
2019.01.01T00:00:00.200 | 14,950 |
2019.01.01T00:00:00.300 | 24,950 |
2019.01.01T00:00:00.350 | 29,950 |
2019.01.01T00:00:00.400 | 34,950 |
2019.01.01T00:00:00.450 | 39,950 |
2019.01.01T00:00:00.500 | 44,950 |
Now add a new measure avg(x)
to the streaming engine. The new
measure's column name is avg_x and the data type is DOUBLE in the output table.
newMetricsSchema= table(1:0, [`avg_x], [DOUBLE])
addMetrics(agg1, <avg(x)>, newMetricsSchema);
n=300
time=2019.01.01T00:00:00.500+(1..n)
id=take(`ABC`DEF, n)
x=500+1..n
insert into t values(time, id, x);
select * from output1;
time | sum_x | avg_x |
---|---|---|
2019.01.01T00:00:00.050 | 1,225 | |
2019.01.01T00:00:00.100 | 4,950 | |
2019.01.01T00:00:00.150 | 9,950 | |
2019.01.01T00:00:00.200 | 14,950 | |
2019.01.01T00:00:00.250 | 19,950 | |
2019.01.01T00:00:00.300 | 24,950 | |
2019.01.01T00:00:00.350 | 29,950 | |
2019.01.01T00:00:00.400 | 34,950 | |
2019.01.01T00:00:00.450 | 39,950 | |
2019.01.01T00:00:00.500 | 44,950 | |
2019.01.01T00:00:00.550 | 49,950 | 525 |
2019.01.01T00:00:00.600 | 54,950 | 550 |
2019.01.01T00:00:00.650 | 59,950 | 599.5 |
2019.01.01T00:00:00.700 | 64,950 | 649.5 |
2019.01.01T00:00:00.750 | 69,950 | 699.5 |
2019.01.01T00:00:00.800 | 74,950 | 749.5 |