addMetrics

Syntax

addMetrics(engine, newMetrics, newMetricsSchema, [windowSize], [fill])

Alias: extendMetrics

Arguments

engine is the abstract table object returned by a streaming engine function such as createTimeSeriesEngine. Note that addMetrics cannot be used in createAnomalyDetectionEngine and createReactiveStateEngine.

newMetrics is metacode indicating the new metrics to be calculated by the streaming engine. The metacode can include one or more expressions, built-in or user-defined functions, or a constant scalar/vector. Note that the output column for a constant vector must be in array vector form.

newMetricsSchema is a table object specifying the column names and data types of the new metrics in the output table.

windowSize (optional) is a positive integer indicating the length of the windows for calculation for the new metrics. It is only used for the time-series engine. It must be one of the values of windowSize in the existing time-series engine. The default value is the first value of windowSize in the existing time-series engine.

fill (optional) is a vector/scalar indicating the filling method to deal with an empty window (in a group). It can be:

  • 'none': no result

  • 'null': output a NULL value.

  • 'ffill': output the result in the last window.

  • specific value: output the specified value. Its type should be the same as metrics output's type.

fill could be a vector to specify different filling method for each metric. The size of the vector must be consistent with the number of elements specified in metrics. The element in vector cannot be 'none'.

Details

Dynamically add new measures to a streaming engine.

Examples

Calculate the sum of column x with a streaming engine:

share streamTable(10000:0,`time`id`x, [TIMESTAMP,SYMBOL,INT]) as t
output1 = table(10000:0, `time`sum_x, [TIMESTAMP,INT])
agg1 = createTimeSeriesEngine(name=`agg1, windowSize=100, step=50, metrics=<sum(x)>, dummyTable=t, outputTable=output1, timeColumn=`time)
subscribeTable(tableName="t", actionName="agg1", offset=0, handler=append!{agg1}, msgAsTable=true)
n=500
time=2019.01.01T00:00:00.000+(1..n)
id=take(`ABC`DEF, n)
x=1..n
insert into t values(time, id, x);
select * from output1;
time sum_x
2019.01.01T00:00:00.050 1,225
2019.01.01T00:00:00.100 4,950
2019.01.01T00:00:00.150 9,950
2019.01.01T00:00:00.200 14,950
2019.01.01T00:00:00.300 24,950
2019.01.01T00:00:00.350 29,950
2019.01.01T00:00:00.400 34,950
2019.01.01T00:00:00.450 39,950
2019.01.01T00:00:00.500 44,950

Now add a new measure avg(x) to the streaming engine. The new measure's column name is avg_x and the data type is DOUBLE in the output table.

newMetricsSchema= table(1:0, [`avg_x], [DOUBLE])
addMetrics(agg1, <avg(x)>, newMetricsSchema);
n=300
time=2019.01.01T00:00:00.500+(1..n)
id=take(`ABC`DEF, n)
x=500+1..n
insert into t values(time, id, x);
select * from output1;
time sum_x avg_x
2019.01.01T00:00:00.050 1,225
2019.01.01T00:00:00.100 4,950
2019.01.01T00:00:00.150 9,950
2019.01.01T00:00:00.200 14,950
2019.01.01T00:00:00.250 19,950
2019.01.01T00:00:00.300 24,950
2019.01.01T00:00:00.350 29,950
2019.01.01T00:00:00.400 34,950
2019.01.01T00:00:00.450 39,950
2019.01.01T00:00:00.500 44,950
2019.01.01T00:00:00.550 49,950 525
2019.01.01T00:00:00.600 54,950 550
2019.01.01T00:00:00.650 59,950 599.5
2019.01.01T00:00:00.700 64,950 649.5
2019.01.01T00:00:00.750 69,950 699.5
2019.01.01T00:00:00.800 74,950 749.5