DStream::timeBucketEngine
Syntax
DStream::timeBucketEngine(timeCutPoints, metrics, timeColumn, [keyColumn],
[useWindowStartTime], [closed='left'], [fill='none'], [keyPurgeFreqInSecond=-1],
[parallelism=1])
Details
Creates a time-series aggregation engine that processes data in custom time windows. For details, see createTimeBucketEngine.
Return value: A DStream object.
Arguments
- Must contain no null values.
- The timestamp precision of timeCutPoints must be equal to or coarser than the precision of timeColumn.
- Its precision determines the exact window boundary behavior. For example, minute-precision window [09:00, 09:05) excludes data ≥ 09:04:00; second-precision window [09:00:00, 09:05:00) excludes data ≥ 09:04:59.
- It can use one or more built-in or user-defined aggregate functions (which
must be defined by the
defg
keyword) such as<[sum(volume), avg(price)]>
, or expressions of aggregate functions such as as<[avg(price1)-avg(price2)]>
, or aggregate functions involving multiple columns such as<[std(price1-price2)]>
. - You can specify functions that return multiple values for metrics,
such as
<func(price) as `col1`col2>
(it's optional to specify the column names). - If metrics is a tuple with multiple formulas, windowSize is
specified as a vector of the same length as metrics. Each element of
windowSize corresponds to the elements in metrics. For
example, if windowSize=[10,20], metrics can be
(<[min(volume), max(volume)]>, <sum(volume)>)
. metrics can also input nested tuple vectors, such as[[<[min(volume), max(volume)]>, <sum(volume)>], [<avg(volume)>]]
.
- The column names specified in metrics are not case-sensitive and can be inconsistent with the column names of the input tables.
- Nested aggregate function calls are not supported in metrics.
timeColumn is a STRING scalar or vector specifying the time column(s) of the subscribed stream table.
Note: If timeColumn is a vector, it must have a date element (of DATE type) and a time element (of TIME, SECOND or NANOTIME type). In this case, the first column in outputTable must take the data type of concatDateTime(date, time).
keyColumn (optional) is a STRING scalar/vector indicating the name of the grouping column(s). If it is specified, the engine conducts the calculations within each group. For example, group the data by stock symbol and apply moving aggregation functions to each stock.
useWindowStartTime (optional) is a Boolean value indicating whether the time column in outputTable is the starting time of the windows. The default value is false, which means the timestamps in the output table are the end time of the windows.
- closed = 'left': left-closed, right-open
- closed = 'right': left-open, right-closed
- 'none': no result
- 'null': output a null value.
- 'ffill': output the result in the last window.
- specific value: output the specified value. Its type should be the same as metrics output's type.
keyPurgeFreqInSec (optional) is a positive integer indicating the interval (in seconds) to remove groups with no incoming data for a long time. If a group has no incoming data for at least keyPurgeFreqInSec seconds after the last time of data purging, it will be removed.
Note: To specify this parameter, parameter keyColumn must be specified and parameter fill cannot be specified.
parallelism (optional) is a positive integer no greater than 63, representing the number of worker threads for parallel computation. The default value is 1. For compute-intensive workloads, adjusting this parameter appropriately can effectively utilize computing resources and reduce computation time. It is recommended to set a value less than the number of CPU cores, normally from 4 to 8.
Examples
if (!existsCatalog("orca")) {
createCatalog("orca")
}
go
use catalog orca
// If a stream graph with the same name already exists, destroy it first.
// dropStreamGraph('engine')
g = createStreamGraph('engine')
g.source("trades", 1000:0, `time`sym`price`volume, [TIMESTAMP, SYMBOL, DOUBLE, INT])
.timeSeriesEngine(windowSize=60000, step=60000, metrics=<[first(price), max(price), min(price), last(price), sum(volume)]>, timeColumn=`time, useSystemTime=false, keyColumn=`sym, useWindowStartTime=false)
.timeBucketEngine(timeCutPoints=[10:00m, 10:05m, 10:10m, 10:15m], metrics=<[first(first_price), max(max_price), min(min_price), last(last_price), sum(sum_volume)]>, timeColumn=`time, keyColumn=`sym)
.sink("output")
g.submit()
go
times = [2024.10.08T10:01:01.785, 2024.10.08T10:01:02.125, 2024.10.08T10:01:12.457, 2024.10.08T10:03:10.789, 2024.10.08T10:03:12.005, 2024.10.08T10:08:02.236, 2024.10.08T10:08:04.412, 2024.10.08T10:08:05.152, 2024.10.08T10:08:30.021, 2024.10.08T10:10:20.123, 2024.10.08T10:11:02.236, 2024.10.08T10:13:04.412, 2024.10.08T10:15:12.005]
syms = [`A, `B, `A, `A, `B, `A, `B, `B, `A, `A, `A, `B, `B]
prices = [10.83, 21.73, 10.79, 11.81, 22.96, 11.25, 23.03, 23.18, 11.04, 11.85, 11.06, 23.15, 22.06]
volumes = [2110, 1600, 2850, 2250, 1980, 2400, 2130, 1900, 2300, 2200, 2200, 1880, 2100]
tmp = table(times as time, syms as sym, prices as price, volumes as volume)
appendOrcaStreamTable("trades", tmp)
select * from orca_table.output
time | sym | first_first_price | max_max_price | min_min_price | last_last_price | sum_sum_volume |
---|---|---|---|---|---|---|
2024.10.08 10:05:00.000 | A | 10.83 | 11.81 | 10.79 | 11.81 | 7,210 |
2024.10.08 10:05:00.000 | B | 21.73 | 22.96 | 21.73 | 22.96 | 3,580 |
2024.10.08 10:10:00.000 | A | 11.25 | 11.25 | 11.04 | 11.04 | 4,700 |
2024.10.08 10:10:00.000 | B | 23.03 | 23.18 | 23.03 | 23.18 | 4,030 |
2024.10.08 10:15:00.000 | B | 23.15 | 23.15 | 23.15 | 23.15 | 1,880 |