createSessionWindowEngine
Syntax
createSessionWindowEngine(name, sessionGap, metrics, dummyTable, outputTable,
[timeColumn], [useSystemTime=false], [keyColumn], [updateTime],
[useSessionStartTime=true], [snapshotDir], [snapshotIntervalInMsgCount],
[raftGroup], [forceTriggerTime])
Details
This function creates a session window streaming engine. The session window engine shares most of its parameters with the time-series engine (createTimeSeriesEngine), but includes two unique parameters: sessionGap and useSessionStartTime.
For more application scenarios, see Streaming Engines.
Starting from version 2.00.11, array vectors are allowed in dummyTable and outputTable, but they cannot be involved in calculations specified in metrics.
Calculation Rules
When records are ingested into a session window, the window remains open until a specified period of inactivity passes (specified by sessionGap). The window end is equal to the timestamp of the last received record + the sessionGap interval. The calculation of this window is triggered by the arrival of the next record after the window ends.
Note: If keyColumn is specified to group data by the column values, the calculations described above will be performed within each group.
Arguments
As most of the parameters of createSessionWindowEngine
are identical
with those of createTimeSeriesEngine, we only explain the following parameters of
createSessionWindowEngine
that are different from those of
createTimeSeriesEngine
.
sessionGap a positive integer indicating the gap between 2 session windows. Its unit is determined by the parameter useSystemTime.
useSessionStartTime (optional) is a Boolean value indicating whether the first column in outputTable is the starting time of the windows, i.e., the timestamp of the first record in each window. Setting it to false means the timestamps in the output table are the ending time of the windows, i.e., timestamp of the last record in window + sessionGap. If updateTime is specified, useSessionStartTime must be true.
forceTriggerTime (optional) is a non-negative integer. Its unit is the same as the time precision of timeColumn. forceTriggerTime indicates the waiting time to force trigger calculation in uncalculated windows for each group.
Examples
share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
share table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT]) as output1
engine_sw = createSessionWindowEngine(name = "engine_sw", sessionGap = 5, metrics = <sum(volume)>, dummyTable = trades, outputTable = output1, timeColumn = `time, keyColumn=`sym)
subscribeTable(tableName="trades", actionName="append_engine_sw", offset=0, handler=append!{engine_sw}, msgAsTable=true)
n = 5
timev = 2018.10.12T10:01:00.000 + (1..n)
symv=take(`A`B`C,n)
volumev = (1..n)%1000
insert into trades values(timev, symv, volumev)
n = 5
timev = 2018.10.12T10:01:00.010 + (1..n)
volumev = (1..n)%1000
symv=take(`A`B`C,n)
insert into trades values(timev, symv, volumev)
n = 6
timev = 2018.10.12T10:01:00.020 + 1 2 3 8 14 20
volumev = (1..n)%1000
symv=take(`A`B`C,n)
insert into trades values(timev, symv, volumev)
select * from output1;
time | sym | volume |
---|---|---|
2018.10.12T10:01:00.001 | A | 5 |
2018.10.12T10:01:00.002 | B | 7 |
2018.10.12T10:01:00.003 | C | 3 |
2018.10.12T10:01:00.011 | A | 5 |
2018.10.12T10:01:00.012 | B | 7 |
2018.10.12T10:01:00.013 | C | 3 |
2018.10.12T10:01:00.021 | A | 1 |
2018.10.12T10:01:00.022 | B | 2 |
2018.10.12T10:01:00.023 | C | 3 |
Specify forceTriggerTime as 1000. 1000 ms after the ingestion of the last record, calculation is triggered in all groups. Replace the engine creation statement with the following code:
engine_sw = createSessionWindowEngine(name = "engine_sw", sessionGap = 5, metrics = <sum(volume)>, dummyTable = trades, outputTable = output1, timeColumn = `time, keyColumn=`sym, forceTriggerTime=1000)
Print the output table again. The result is as follows:
time | sym | volume |
---|---|---|
2018.10.12T10:01:00.001 | A | 5 |
2018.10.12T10:01:00.002 | B | 7 |
2018.10.12T10:01:00.003 | C | 3 |
2018.10.12T10:01:00.011 | A | 5 |
2018.10.12T10:01:00.012 | B | 7 |
2018.10.12T10:01:00.013 | C | 3 |
2018.10.12T10:01:00.021 | A | 1 |
2018.10.12T10:01:00.022 | B | 2 |
2018.10.12T10:01:00.023 | C | 3 |
2018.10.12T10:01:00.028 | A | 4 |
2018.10.12T10:01:00.034 | B | 5 |
2018.10.12T10:01:00.040 | C | 6 |