DStream::timeSeriesEngine
Syntax
DStream::timeSeriesEngine(windowSize, step, metrics, [timeColumn],
[useSystemTime=false], [keyColumn], [updateTime], [useWindowStartTime],
[roundTime=true], [fill='none'], [forceTriggerTime], [keyPurgeFreqInSecond=-1],
[closed='left'], [subWindow], [parallelism=1], [acceptedDelay=0])
Details
Creates a time-series streaming engine. For details, see createTimeSeriesEngine.
Return value: A DStream object.
Arguments
windowSize is a scalar or vector with positive integers that specifies the size of the windows for calculation.
step is a positive integer indicating how much each window moves forward relative to the previous one. Note that step must be divisible by windowSize, otherwise an exception will be thrown.
- If useSystemTime =true, the unit of windowSize and step is millisecond.
- If useSystemTime =false, the unit of windowSize and step is the same as the unit of timeColumn.
- It can use one or more built-in or user-defined aggregate functions (which
must be defined by the
defg
keyword) such as<[sum(volume), avg(price)]>
, or expressions of aggregate functions such as as<[avg(price1)-avg(price2)]>
, or aggregate functions involving multiple columns such as<[std(price1-price2)]>
. - You can specify functions that return multiple values for metrics,
such as
<func(price) as `col1`col2>
(it's optional to specify the column names). - If metrics is a tuple with multiple formulas, windowSize is
specified as a vector of the same length as metrics. Each element of
windowSize corresponds to the elements in metrics. For
example, if windowSize=[10,20], metrics can be
(<[min(volume), max(volume)]>, <sum(volume)>)
. metrics can also input nested tuple vectors, such as[[<[min(volume), max(volume)]>, <sum(volume)>], [<avg(volume)>]]
.
- The column names specified in metrics are not case-sensitive and can be inconsistent with the column names of the input tables.
- Nested aggregate function calls are not supported in metrics.
timeColumn (optional) is a STRING scalar or vector specifying the time column(s) of the subscribed stream table. When useSystemTime = false, it must be specified.
- useSystemTime = true: the engine will regularly window the streaming data at fixed time intervals for calculations according to the ingestion time (local system time with millisecond precision, independent of any temporal columns in the streaming table) of each record. As long as a window contains data, the calculation will be performed automatically when the window ends. The first column in output table indicates the timestamp when the calculation occurred.
- useSystemTime = false (default): the engine will window the streaming data according to the timeColumn in the stream table. The calculation for a window is triggered by the first record after the previous window. Note that the record which triggers the calculation will not participate in this calculation.
keyColumn (optional) is a STRING scalar/vector indicating the name of the grouping column(s). If it is specified, the engine conducts the calculations within each group. For example, group the data by stock symbol and apply moving aggregation functions to each stock.
updateTime (optional) is a non-negative integer which takes the same time precision as timeColumn. It is used to trigger window calculations at an interval shorter than step. step must be a multiple of updateTime. To specify updateTime, useSystemTime must be set to false.
If updateTime is not specified, calculation for a window will not occur before the window ends. By specifying updateTime, you can calculate the values for several times in an window of which calculation hasn't been triggered for a long time.
useWindowStartTime (optional) is a Boolean value indicating whether the time column in outputTable is the starting time of the windows. The default value is false, which means the timestamps in the output table are the end time of the windows. If the windowSize is a vector, useWindowStartTime must be false.
roundTime (optional) is a Boolean value indicating the method to align the window boundary if the time precision is milliseconds or seconds and step is bigger than one minute. The default value is true indicating the alignment is based on the multi-minute rule (see the alignment rules). False means alignment is based on the one-minute rule.
- 'none': no result
- 'null': output a null value.
- 'ffill': output the result in the last window.
- specific value: output the specified value. Its type should be the same as metrics output's type.
forceTriggerTime (optional) is a non-negative integer which takes the same time precision as timeColumn, indicating the waiting time to force trigger calculation in the uncalculated windows for each group. If forceTriggerTime is set, useSystemTime must be false and updateTime cannot be specified.
The rules are as follow:
(1) Suppose the end time of the uncalculated window is t, and an incoming record of another group arrives at t1: when t1-t>=forceTriggerTime, calculation of the window will be triggered.
(2) If no data is ingested into a group after the last window is calculated, and new data continues to ingest into other groups, the specified fill parameter can be used to fill results for empty windows of that group. The group's windows will still be output at the latest time point. If parameter fill is not specified, no new windows will be generated for that group after the last window has been triggered for computation.
Note the following points when setting forceTriggerTime or updateTime:
- If updateTime is specified, the result of the current window calculation will be updated again when data belonging to the current window still arrives after the calculation is triggered.
- If forceTriggerTime is specified, the incoming data with a timestamp within the current window will be discarded after the calculation is forced to be triggered.
keyPurgeFreqInSec (optional) is a positive integer indicating the interval (in seconds) to remove groups with no incoming data for a long time. If a group has no incoming data for at least keyPurgeFreqInSec seconds after the last time of data purging, it will be removed.
Note: To specify this parameter, parameter forceTriggerTime must be specified and parameter fill cannot be specified.
You can check the number of groups in a time-series streaming engine based on the column "numGroups" returned by getStreamEngineStat.
- closed = 'left': left-closed, right-open
- closed = 'right': left-open, right-closed
subWindow (optional)is a pair of integers or DURATION values, indicating the range of the subwindow within the window specified by windowSize. If specified, only results calculated within subwindows will be returned and the time column of the output table displays the end time of each subwindow. The calculation of the subwindow will be triggered by the arrival of the next record after the subwindow ends. The boundary of the subwindow is determined by parameter closed. When subWindow is a pair of integers, it takes the same time precision as timeColumn.
parallelism (optional) is a positive integer no greater than 63, representing the number of worker threads for parallel computation. The default value is 1. For compute-intensive workloads, adjusting this parameter appropriately can effectively utilize computing resources and reduce computation time. It is recommended to set a value less than the number of CPU cores, normally from 4 to 8.
- When useSystemTime=true, data received within the acceptedDelay time after the window ends will still be considered part of the current window and participate in the computation, and will not be included in the computation of the next window.
- When useSystemTime=false, a window with t as right boundary will wait until a record with a timestamp equal to or later than t + acceptedDelay arrives. When such a record arrives, the current window closes and performs a calculation on all records within the window frame. This handles scenarios with out-of-order data.
Examples
if (!existsCatalog("orca")) {
createCatalog("orca")
}
go
use catalog orca
// If a stream graph with the same name already exists, destroy it first.
// dropStreamGraph('engine')
g = createStreamGraph('engine')
g.source("trades", 1000:0, ["time","sym","volume"], [TIMESTAMP, SYMBOL, INT])
.timeSeriesEngine(windowSize=60000, step=60000, metrics=<[sum(volume)]>, timeColumn="time", useSystemTime=false, keyColumn="sym", useWindowStartTime=false)
.sink("output")
g.submit()
go
times = [2018.10.08T01:01:01.785, 2018.10.08T01:01:02.125, 2018.10.08T01:01:10.263, 2018.10.08T01:01:12.457, 2018.10.08T01:02:10.789, 2018.10.08T01:02:12.005, 2018.10.08T01:02:30.021, 2018.10.08T01:04:02.236, 2018.10.08T01:04:04.412, 2018.10.08T01:04:05.152]
syms = [`A, `B, `B, `A, `A, `B, `A, `A, `B, `B]
volumes = [10, 26, 14, 28, 15, 9, 10, 29, 32, 23]
tmp = table(times as time, syms as sym, volumes as volume)
appendOrcaStreamTable("trades", tmp)
select * from orca_table.output
time |
sym | sum_volume |
---|---|---|
2018.10.08 01:02:00.000 | A | 38 |
2018.10.08 01:02:00.000 | B | 40 |
2018.10.08 01:03:00.000 | A | 25 |
2018.10.08 01:03:00.000 | B | 9 |