DStream::dailyTimeSeriesEngine
Syntax
DStream::dailyTimeSeriesEngine(windowSize, step, metrics, sessionBegin,
[sessionEnd], [timeColumn], [useSystemTime=false], [keyColumn], [updateTime],
[useWindowStartTime], [roundTime=true], [fill='none'], [mergeSessionEnd=false],
[forceTriggerTime], [forceTriggerSessionEndTime], [keyPurgeFreqInSecond],
[closed='left'], [subWindow], [parallelism=1],[acceptedDelay=0],
[keyPurgeDaily=true])
Details
Creates a daily time-series streaming engine. For details, see createDailyTimeSeriesEngine.
Return value: A DStream object.
Arguments
windowSize is a scalar or vector with positive integers that specifies the size of the windows for calculation.
step is a positive integer indicating how much each window moves forward relative to the previous one. Note that step must be divisible by windowSize, otherwise an exception will be thrown.
- It can use one or more built-in or user-defined aggregate functions (which
must be defined by the
defg
keyword) such as<[sum(volume), avg(price)]>
, or expressions of aggregate functions such as as<[avg(price1)-avg(price2)]>
, or aggregate functions involving multiple columns such as<[std(price1-price2)]>
. - You can specify functions that return multiple values for metrics,
such as
<func(price) as `col1`col2>
(it's optional to specify the column names). - If metrics is a tuple with multiple formulas, windowSize is
specified as a vector of the same length as metrics. Each element of
windowSize corresponds to the elements in metrics. For
example, if windowSize=[10,20], metrics can be
(<[min(volume), max(volume)]>, <sum(volume)>)
. metrics can also input nested tuple vectors, such as[[<[min(volume), max(volume)]>, <sum(volume)>], [<avg(volume)>]]
.
- The column names specified in metrics are not case-sensitive and can be inconsistent with the column names of the input tables.
- Nested aggregate function calls are not supported in metrics.
sessionBegin can be a scalar or vector of type SECOND, TIME or NANOTIME corresponding to the data type of the time column, indicating the starting time of each session. If it is a vector, it must be increasing.
sessionEnd (optional) can be a scalar or vector of type SECOND, TIME or NANOTIME corresponding to the data type of the time column, indicating the end time of each session. Specify sessionEnd as 00:00:00 to indicate the beginning of the next day (i.e., 24:00:00 of the current day).
timeColumn (optional) is a STRING scalar or vector specifying the time column(s) of the subscribed stream table. When useSystemTime = false, it must be specified.
- useSystemTime = true: the engine will regularly window the streaming data at fixed time intervals for calculations according to the ingestion time (local system time with millisecond precision, independent of any temporal columns in the streaming table) of each record. As long as a window contains data, the calculation will be performed automatically when the window ends. The first column in output table indicates the timestamp when the calculation occurred.
- useSystemTime = false (default): the engine will window the streaming data according to the timeColumn in the stream table. The calculation for a window is triggered by the first record after the previous window. Note that the record which triggers the calculation will not participate in this calculation.
keyColumn (optional) is a STRING scalar/vector indicating the name of the grouping column(s). If it is specified, the engine conducts the calculations within each group. For example, group the data by stock symbol and apply moving aggregation functions to each stock.
updateTime (optional) is a non-negative integer which takes the same time precision as timeColumn. It is used to trigger window calculations at an interval shorter than step. step must be a multiple of updateTime. To specify updateTime, useSystemTime must be set to false.
useWindowStartTime (optional) is a Boolean value indicating whether the time column in outputTable is the starting time of the windows. The default value is false, which means the timestamps in the output table are the end time of the windows. If the windowSize is a vector, useWindowStartTime must be false.
roundTime (optional) is a Boolean value indicating the method to align the window boundary if the time precision is milliseconds or seconds and step is bigger than one minute. The default value is true indicating the alignment is based on the multi-minute rule (see the alignment rules). False means alignment is based on the one-minute rule.
- 'none': no result
- 'null': output a null value.
- 'ffill': output the result in the last window.
- specific value: output the specified value. Its type should be the same as metrics output's type.
mergeSessionEnd (optional) is a Boolean value. This parameter is only applicable when closed = 'left'. It determines whether the record arriving at the end of a session will be included in the calculation of the last window of that session. The default value is false, which means the record will not be included in the last window but will trigger its calculation. If the current session is not the last session of the day, the record will participate in the calculation of the first window of the next session.
forceTriggerTime (optional) is a non-negative integer which takes the same time precision as timeColumn, indicating the waiting time to force trigger calculation in the uncalculated windows for each group. If forceTriggerTime is set, useSystemTime must be false and updateTime cannot be specified.
forceTriggerSessionEndTime (optional) is a positive integer. The unit of forceTriggerSessionEndTime is consistent with the precision of timeColumn. It indicates the waiting time to force trigger calculation in the window containing the sessionEnd, if it ends without calculation.
keyPurgeFreqInSec (optional) is a positive integer indicating the interval (in seconds) to remove groups with no incoming data for a long time. If a group has no incoming data for at least keyPurgeFreqInSec seconds after the last time of data purging, it will be removed.
- closed = 'left': left-closed, right-open
- closed = 'right': left-open, right-closed
subWindow (optional)is a pair of integers or DURATION values, indicating the range of the subwindow within the window specified by windowSize. If specified, only results calculated within subwindows will be returned and the time column of the output table displays the end time of each subwindow. The calculation of the subwindow will be triggered by the arrival of the next record after the subwindow ends. The boundary of the subwindow is determined by parameter closed. When subWindow is a pair of integers, it takes the same time precision as timeColumn.
parallelism (optional) is a positive integer no greater than 63, representing the number of worker threads for parallel computation. The default value is 1. For compute-intensive workloads, adjusting this parameter appropriately can effectively utilize computing resources and reduce computation time. It is recommended to set a value less than the number of CPU cores, normally from 4 to 8.
- When useSystemTime=true, data received within the acceptedDelay time after the window ends will still be considered part of the current window and participate in the computation, and will not be included in the computation of the next window.
- When useSystemTime=false, a window with t as right boundary will wait until a record with a timestamp equal to or later than t + acceptedDelay arrives. When such a record arrives, the current window closes and performs a calculation on all records within the window frame. This handles scenarios with out-of-order data.
keyPurgeDaily (optional) is a Boolean value determining if existing data groups are automatically removed when newer data of a subsequent calendar day is ingested. The default value is true. If set to false, groups of the previous calendar day are retained.
Examples
if (!existsCatalog("orca")) {
createCatalog("orca")
}
go
use catalog orca
// If a stream graph with the same name already exists, destroy it first.
// dropStreamGraph('DailyTimeSeries')
g = createStreamGraph('DailyTimeSeries')
g.source("trades", 1000:0, `date`second`sym`volume, [DATE, SECOND, SYMBOL, INT])
.dailyTimeSeriesEngine(windowSize=60, step=60, metrics=<[sum(volume)]>, sessionBegin=09:30:00 13:00:00, sessionEnd=11:30:00 15:00:00, timeColumn=`date`second, mergeSessionEnd=true)
.sink("output")
g.submit()
go
dates=take(date(2025.05.07),8)
seconds=[09:25:31, 09:26:01, 09:30:02, 09:30:10, 11:29:46, 11:29:50, 11:30:00, 11:30:01]
syms=["A", "B", "A", "B", "A", "B", "B", "A"]
volumes=[8, 10, 26, 14, 30, 11, 14, 4]
tmp=table(dates as date, seconds as second, syms as sym, volumes as volume)
appendOrcaStreamTable("trades", tmp)
select * from orca_table.output
concatDateTime | sum_volume |
---|---|
2025.05.07 09:31:00 | 58 |
2025.05.07 11:30:00 | 55 |