createSessionWindowEngine

Syntax

createSessionWindowEngine(name, sessionGap, metrics, dummyTable, outputTable, [timeColumn], [useSystemTime=false], [keyColumn], [updateTime], [useSessionStartTime=true], [snapshotDir], [snapshotIntervalInMsgCount], [raftGroup], [forceTriggerTime])

Details

This function creates a session window streaming engine. The session window engine shares most of its parameters with the time-series engine (createTimeSeriesEngine), but includes two unique parameters: sessionGap and useSessionStartTime.

For more application scenarios, see Streaming Engines.

Calculation Rules

When records are ingested into a session window, the window remains open until a specified period of inactivity passes (specified by sessionGap). The window end is equal to the timestamp of the last received record + the sessionGap interval. The calculation of this window is triggered by the arrival of the next record after the window ends.

Note: If keyColumn is specified to group data by the column values, the calculations described above will be performed within each group.

Arguments

As most of the parameters of createSessionWindowEngine are identical with those of createTimeSeriesEngine, we only explain the following parameters of createSessionWindowEngine that are different from those of createTimeSeriesEngine.

sessionGap a positive integer indicating the gap between 2 session windows. Its unit is determined by the parameter useSystemTime.

useSessionStartTime (optional) is a Boolean value indicating whether the first column in outputTable is the starting time of the windows, i.e., the timestamp of the first record in each window. Setting it to false means the timestamps in the output table are the ending time of the windows, i.e., timestamp of the last record in window + sessionGap. If updateTime is specified, useSessionStartTime must be true.

forceTriggerTime (optional) is a non-negative integer. Its unit is the same as the time precision of timeColumn. forceTriggerTime indicates the waiting time to force trigger calculation in uncalculated windows for each group.

Examples

share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
share table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT]) as output1
engine_sw = createSessionWindowEngine(name = "engine_sw", sessionGap = 5, metrics = <sum(volume)>, dummyTable = trades, outputTable = output1, timeColumn = `time, keyColumn=`sym)
subscribeTable(tableName="trades", actionName="append_engine_sw", offset=0, handler=append!{engine_sw}, msgAsTable=true)

n = 5
timev = 2018.10.12T10:01:00.000 + (1..n)
symv=take(`A`B`C,n)
volumev = (1..n)%1000
insert into trades values(timev, symv, volumev)

n = 5
timev = 2018.10.12T10:01:00.010 + (1..n)
volumev = (1..n)%1000
symv=take(`A`B`C,n)
insert into trades values(timev, symv, volumev)

n = 6
timev = 2018.10.12T10:01:00.020 + 1 2 3 8 14 20
volumev = (1..n)%1000
symv=take(`A`B`C,n)
insert into trades values(timev, symv, volumev)

select * from output1;
time sym volume
2018.10.12T10:01:00.001 A 5
2018.10.12T10:01:00.002 B 7
2018.10.12T10:01:00.003 C 3
2018.10.12T10:01:00.011 A 5
2018.10.12T10:01:00.012 B 7
2018.10.12T10:01:00.013 C 3
2018.10.12T10:01:00.021 A 1
2018.10.12T10:01:00.022 B 2
2018.10.12T10:01:00.023 C 3

Specify forceTriggerTime as 1000. 1000 ms after the ingestion of the last record, calculation is triggered in all groups. Replace the engine creation statement with the following code:

engine_sw = createSessionWindowEngine(name = "engine_sw", sessionGap = 5, metrics = <sum(volume)>, dummyTable = trades, outputTable = output1, timeColumn = `time, keyColumn=`sym, forceTriggerTime=1000)

Print the output table again. The result is as follows:

time sym volume
2018.10.12T10:01:00.001 A 5
2018.10.12T10:01:00.002 B 7
2018.10.12T10:01:00.003 C 3
2018.10.12T10:01:00.011 A 5
2018.10.12T10:01:00.012 B 7
2018.10.12T10:01:00.013 C 3
2018.10.12T10:01:00.021 A 1
2018.10.12T10:01:00.022 B 2
2018.10.12T10:01:00.023 C 3
2018.10.12T10:01:00.028 A 4
2018.10.12T10:01:00.034 B 5
2018.10.12T10:01:00.040 C 6