DStream::sessionWindowEngine

Syntax

DStream::sessionWindowEngine(sessionGap, metrics, [timeColumn], [useSystemTime=false], [keyColumn], [updateTime], [useSessionStartTime=true], [forceTriggerTime])

Details

Creates a session window streaming engine. For details, see createSessionWindowEngine.

Return value: A DStream object.

Arguments

sessionGap a positive integer indicating the gap between 2 session windows. Its unit is determined by the parameter useSystemTime.

metrics is metacode or a tuple specifying the calculation formulas. For more information about metacode please refer to Metaprogramming.
  • It can use one or more built-in or user-defined aggregate functions (which must be defined by the defg keyword) such as <[sum(volume), avg(price)]>, or expressions of aggregate functions such as as <[avg(price1)-avg(price2)]>, or aggregate functions involving multiple columns such as <[std(price1-price2)]>.
  • You can specify functions that return multiple values for metrics, such as <func(price) as `col1`col2> (it's optional to specify the column names).
  • If metrics is a tuple with multiple formulas, windowSize is specified as a vector of the same length as metrics. Each element of windowSize corresponds to the elements in metrics. For example, if windowSize=[10,20], metrics can be (<[min(volume), max(volume)]>, <sum(volume)>). metrics can also input nested tuple vectors, such as [[<[min(volume), max(volume)]>, <sum(volume)>], [<avg(volume)>]].
Note:
  • The column names specified in metrics are not case-sensitive and can be inconsistent with the column names of the input tables.
  • Nested aggregate function calls are not supported in metrics.

timeColumn (optional) is a STRING scalar or vector specifying the time column(s) of the subscribed stream table. When useSystemTime = false, it must be specified.

useSystemTime (optional) is a Boolean value indicating whether the calculations are performed based on the system time when data is ingested into the engine.
  • useSystemTime = true: the engine will regularly window the streaming data at fixed time intervals for calculations according to the ingestion time (local system time with millisecond precision, independent of any temporal columns in the streaming table) of each record. As long as a window contains data, the calculation will be performed automatically when the window ends. The first column in output table indicates the timestamp when the calculation occurred.
  • useSystemTime = false (default): the engine will window the streaming data according to the timeColumn in the stream table. The calculation for a window is triggered by the first record after the previous window. Note that the record which triggers the calculation will not participate in this calculation.
For example, there is a window ranges from 10:10:10 to 10:10:19. If useSystemTime = true and the window is not empty, the calculation will be triggered at 10:10:20. If useSystemTime = false and the first record after 10:10:19 is at 10:10:25, the calculation will be triggered at 10:10:25.

keyColumn (optional) is a STRING scalar/vector indicating the name of the grouping column(s). If it is specified, the engine conducts the calculations within each group. For example, group the data by stock symbol and apply moving aggregation functions to each stock.

updateTime (optional) is a non-negative integer which takes the same time precision as timeColumn. It is used to trigger window calculations at an interval shorter than step. step must be a multiple of updateTime. To specify updateTime, useSystemTime must be set to false.

useSessionStartTime (optional) is a Boolean value indicating whether the first column in outputTable is the starting time of the windows, i.e., the timestamp of the first record in each window. Setting it to false means the timestamps in the output table are the ending time of the windows, i.e., timestamp of the last record in window + sessionGap. If updateTime is specified, useSessionStartTime must be true.

forceTriggerTime (optional) is a non-negative integer. Its unit is the same as the time precision of timeColumn. forceTriggerTime indicates the waiting time to force trigger calculation in uncalculated windows for each group.

Examples

if (!existsCatalog("orca")) {
	createCatalog("orca")
}
go
use catalog orca

// If a stream graph with the same name already exists, destroy it first.
// dropStreamGraph('engine')
g = createStreamGraph('engine')

g.source("trades", 1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT])
.sessionWindowEngine(sessionGap = 5, metrics = <sum(volume)>, timeColumn = `time, keyColumn=`sym)
.sink("output")
g.submit()
go

n = 5
time = 2018.10.12T10:01:00.000 + (1..n)
sym = take(`A`B`C, n)
volume = (1..n) % 1000
tmp = table(time as time, sym as sym, volume as volume)
appendOrcaStreamTable("trades", tmp)

n = 5
time = 2018.10.12T10:01:00.010 + (1..n)
sym = take(`A`B`C, n)
volume = (1..n) % 1000
tmp = table(time as time, sym as sym, volume as volume)
appendOrcaStreamTable("trades", tmp)

n = 6
time = 2018.10.12T10:01:00.020 + 1 2 3 8 14 20
sym = take(`A`B`C, n)
volume = (1..n) % 1000
tmp = table(time as time, sym as sym, volume as volume)
appendOrcaStreamTable("trades", tmp)

select * from orca_table.output
time sym volume
2018.10.12 10:01:00.001 A 5
2018.10.12 10:01:00.002 B 7
2018.10.12 10:01:00.003 C 3
2018.10.12 10:01:00.011 A 5
2018.10.12 10:01:00.012 B 7
2018.10.12 10:01:00.013 C 3
2018.10.12 10:01:00.021 A 1
2018.10.12 10:01:00.022 B 2
2018.10.12 10:01:00.023 C 3