DStream::timeBucketEngine

Syntax

DStream::timeBucketEngine(timeCutPoints, metrics, timeColumn, [keyColumn], [useWindowStartTime], [closed='left'], [fill='none'], [keyPurgeFreqInSecond=-1], [parallelism=1])

Details

Creates a time-series aggregation engine that processes data in custom time windows. For details, see createTimeBucketEngine.

Return value: A DStream object.

Arguments

timeCutPoints is a vector of MINUTE or SECOND type defining window boundaries. Each adjacent pair of elements forms a window. Note:
  • Must contain no null values.
  • The timestamp precision of timeCutPoints must be equal to or coarser than the precision of timeColumn.
  • Its precision determines the exact window boundary behavior. For example, minute-precision window [09:00, 09:05) excludes data ≥ 09:04:00; second-precision window [09:00:00, 09:05:00) excludes data ≥ 09:04:59.
metrics is metacode or a tuple specifying the calculation formulas. For more information about metacode please refer to Metaprogramming.
  • It can use one or more built-in or user-defined aggregate functions (which must be defined by the defg keyword) such as <[sum(volume), avg(price)]>, or expressions of aggregate functions such as as <[avg(price1)-avg(price2)]>, or aggregate functions involving multiple columns such as <[std(price1-price2)]>.
  • You can specify functions that return multiple values for metrics, such as <func(price) as `col1`col2> (it's optional to specify the column names).
  • If metrics is a tuple with multiple formulas, windowSize is specified as a vector of the same length as metrics. Each element of windowSize corresponds to the elements in metrics. For example, if windowSize=[10,20], metrics can be (<[min(volume), max(volume)]>, <sum(volume)>). metrics can also input nested tuple vectors, such as [[<[min(volume), max(volume)]>, <sum(volume)>], [<avg(volume)>]].
Note:
  • The column names specified in metrics are not case-sensitive and can be inconsistent with the column names of the input tables.
  • Nested aggregate function calls are not supported in metrics.

timeColumn is a STRING scalar or vector specifying the time column(s) of the subscribed stream table.

Note: If timeColumn is a vector, it must have a date element (of DATE type) and a time element (of TIME, SECOND or NANOTIME type). In this case, the first column in outputTable must take the data type of concatDateTime(date, time).

keyColumn (optional) is a STRING scalar/vector indicating the name of the grouping column(s). If it is specified, the engine conducts the calculations within each group. For example, group the data by stock symbol and apply moving aggregation functions to each stock.

useWindowStartTime (optional) is a Boolean value indicating whether the time column in outputTable is the starting time of the windows. The default value is false, which means the timestamps in the output table are the end time of the windows.

closed (optional) is a STRING indicating whether the left or the right boundary is included.
  • closed = 'left': left-closed, right-open
  • closed = 'right': left-open, right-closed
fill (optional) is a vector/scalar indicating the filling method to deal with an empty window (in a group). It can be:
  • 'none': no result
  • 'null': output a null value.
  • 'ffill': output the result in the last window.
  • specific value: output the specified value. Its type should be the same as metrics output's type.
fill could be a vector to specify different filling method for each metric. The size of the vector must be consistent with the number of elements specified in metrics. The element in vector cannot be 'none'.

keyPurgeFreqInSec (optional) is a positive integer indicating the interval (in seconds) to remove groups with no incoming data for a long time. If a group has no incoming data for at least keyPurgeFreqInSec seconds after the last time of data purging, it will be removed.

Note: To specify this parameter, parameter keyColumn must be specified and parameter fill cannot be specified.

parallelism (optional) is a positive integer no greater than 63, representing the number of worker threads for parallel computation. The default value is 1. For compute-intensive workloads, adjusting this parameter appropriately can effectively utilize computing resources and reduce computation time. It is recommended to set a value less than the number of CPU cores, normally from 4 to 8.

Examples

if (!existsCatalog("orca")) {
	createCatalog("orca")
}
go
use catalog orca

// If a stream graph with the same name already exists, destroy it first.
// dropStreamGraph('engine')
g = createStreamGraph('engine')

g.source("trades", 1000:0, `time`sym`price`volume, [TIMESTAMP, SYMBOL, DOUBLE, INT])
.timeSeriesEngine(windowSize=60000, step=60000, metrics=<[first(price), max(price), min(price), last(price), sum(volume)]>, timeColumn=`time, useSystemTime=false, keyColumn=`sym, useWindowStartTime=false)
.timeBucketEngine(timeCutPoints=[10:00m, 10:05m, 10:10m, 10:15m], metrics=<[first(first_price), max(max_price), min(min_price), last(last_price), sum(sum_volume)]>, timeColumn=`time,  keyColumn=`sym)
.sink("output")
g.submit()
go

times = [2024.10.08T10:01:01.785, 2024.10.08T10:01:02.125, 2024.10.08T10:01:12.457, 2024.10.08T10:03:10.789, 2024.10.08T10:03:12.005, 2024.10.08T10:08:02.236, 2024.10.08T10:08:04.412, 2024.10.08T10:08:05.152, 2024.10.08T10:08:30.021, 2024.10.08T10:10:20.123, 2024.10.08T10:11:02.236, 2024.10.08T10:13:04.412, 2024.10.08T10:15:12.005]
syms = [`A, `B, `A, `A, `B, `A, `B, `B, `A, `A, `A, `B, `B]
prices = [10.83, 21.73, 10.79, 11.81, 22.96, 11.25, 23.03, 23.18, 11.04, 11.85, 11.06, 23.15, 22.06]
volumes = [2110, 1600, 2850, 2250, 1980, 2400, 2130, 1900, 2300, 2200, 2200, 1880, 2100]
tmp = table(times as time, syms as sym, prices as price, volumes as volume)
appendOrcaStreamTable("trades", tmp)

select * from orca_table.output
time sym first_first_price max_max_price min_min_price last_last_price sum_sum_volume
2024.10.08 10:05:00.000 A 10.83 11.81 10.79 11.81 7,210
2024.10.08 10:05:00.000 B 21.73 22.96 21.73 22.96 3,580
2024.10.08 10:10:00.000 A 11.25 11.25 11.04 11.04 4,700
2024.10.08 10:10:00.000 B 23.03 23.18 23.03 23.18 4,030
2024.10.08 10:15:00.000 B 23.15 23.15 23.15 23.15 1,880