createSessionWindowEngine
Syntax
createSessionWindowEngine(name,
sessionGap, metrics, dummyTable, outputTable, [timeColumn],
[useSystemTime=false], [keyColumn], [updateTime], [useSessionStartTime=true],
[snapshotDir], [snapshotIntervalInMsgCount], [raftGroup],
[forceTriggerTime])
Details
This function creates a session window streaming engine. The session window engine shares most of its parameters with the time-series engine (createTimeSeriesEngine), but includes two unique parameters: sessionGap and useSessionStartTime.
For more application scenarios, see Streaming Engines.
Starting from version 2.00.11, array vectors are allowed in dummyTable and outputTable, but they cannot be involved in calculations specified in metrics.
Calculation Rules
When records are ingested into a session window, the window remains open until a specified period of inactivity passes (specified by sessionGap). The window end is equal to the timestamp of the last received record + the sessionGap interval. The calculation of this window is triggered by the arrival of the next record after the window ends.
Parameters
As most of the parameters of createSessionWindowEngine are identical
with those of createTimeSeriesEngine, we only explain the following parameters of
createSessionWindowEngine that are different from those of
createTimeSeriesEngine.
sessionGap a positive integer indicating the gap between 2 session windows. Its unit is determined by the parameter useSystemTime.
useSessionStartTime (optional) is a Boolean value indicating whether the first column in outputTable is the starting time of the windows, i.e., the timestamp of the first record in each window. Setting it to false means the timestamps in the output table are the ending time of the windows, i.e., timestamp of the last record in window + sessionGap. If updateTime is specified, useSessionStartTime must be true.
forceTriggerTime (optional) is a non-negative integer. Its unit is the same as the time precision of timeColumn. forceTriggerTime indicates the waiting time to force trigger calculation in uncalculated windows for each group.
Returns
A table object.
Examples
Example 1. The following example shows how to calculate trading volume by
stock symbol. First create a table “engine_sw” with
createSessionWindowEngine, then subscribe to the stream table
“trades” and writes data to the engine. The engine conducts calculations by group
based on column sym and saves the results to output1 .
share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
share table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT]) as output1
engine_sw = createSessionWindowEngine(name = "engine_sw", sessionGap = 5, metrics = <sum(volume)>, dummyTable = trades, outputTable = output1, timeColumn = `time, keyColumn=`sym)
subscribeTable(tableName="trades", actionName="append_engine_sw", offset=0, handler=append!{engine_sw}, msgAsTable=true)
n = 5
timev = 2018.10.12T10:01:00.000 + (1..n)
symv=take(`A`B`C,n)
volumev = (1..n)%1000
insert into trades values(timev, symv, volumev)
n = 5
timev = 2018.10.12T10:01:00.010 + (1..n)
volumev = (1..n)%1000
symv=take(`A`B`C,n)
insert into trades values(timev, symv, volumev)
n = 6
timev = 2018.10.12T10:01:00.020 + 1 2 3 8 14 20
volumev = (1..n)%1000
symv=take(`A`B`C,n)
insert into trades values(timev, symv, volumev)
select * from output1;
| time | sym | sumVolume |
|---|---|---|
| 2018.10.12T10:01:00.001 | A | 5 |
| 2018.10.12T10:01:00.002 | B | 7 |
| 2018.10.12T10:01:00.003 | C | 3 |
| 2018.10.12T10:01:00.011 | A | 5 |
| 2018.10.12T10:01:00.012 | B | 7 |
| 2018.10.12T10:01:00.013 | C | 3 |
| 2018.10.12T10:01:00.021 | A | 1 |
| 2018.10.12T10:01:00.022 | B | 2 |
| 2018.10.12T10:01:00.023 | C | 3 |
Specify forceTriggerTime as 1000. 1000 ms after the ingestion of the last record, calculation is triggered in all groups. Replace the engine creation statement with the following code:
engine_sw = createSessionWindowEngine(name = "engine_sw", sessionGap = 5, metrics = <sum(volume)>, dummyTable = trades, outputTable = output1, timeColumn = `time, keyColumn=`sym, forceTriggerTime=1000)
Print the output table again. The result is as follows:
| time | sym | sumVolume |
|---|---|---|
| 2018.10.12T10:01:00.001 | A | 5 |
| 2018.10.12T10:01:00.002 | B | 7 |
| 2018.10.12T10:01:00.003 | C | 3 |
| 2018.10.12T10:01:00.011 | A | 5 |
| 2018.10.12T10:01:00.012 | B | 7 |
| 2018.10.12T10:01:00.013 | C | 3 |
| 2018.10.12T10:01:00.021 | A | 1 |
| 2018.10.12T10:01:00.022 | B | 2 |
| 2018.10.12T10:01:00.023 | C | 3 |
| 2018.10.12T10:01:00.028 | A | 4 |
| 2018.10.12T10:01:00.034 | B | 5 |
| 2018.10.12T10:01:00.040 | C | 6 |
Example 2. The following example demonstrates using the engine to capture the last status by sensor groups, and outputs the end time of each session window (last data time + sessionGap) by setting useSessionStartTime=false.
// Create sensor data table
share streamTable(1000:0, `time`sensor`status, [TIMESTAMP, SYMBOL, INT]) as sensors
share table(10000:0, `time`sensor`lastStatus, [TIMESTAMP, SYMBOL, INT]) as output2
// Create the engine with useSessionStartTime=false
engine_sw = createSessionWindowEngine(name = "sensor_engine", sessionGap = 5,
metrics = <last(status)>, dummyTable = sensors,
outputTable = output2, timeColumn = `time, keyColumn=`sensor,
useSessionStartTime=false)
subscribeTable(tableName="sensors", actionName="sensor_monitor",
offset=0, handler=append!{engine_sw}, msgAsTable=true)
// Insert into data
n = 5
timev = 2023.01.01T10:00:00.000 + (1..n)
sensorv = take(`SENSOR001`SENSOR002`SENSOR003, n)
statusv = rand(0..2, n) // 0=Offline, 1=Normal, 2=Fault
insert into sensors values(timev, sensorv, statusv)
n = 5
timev = 2023.01.01T10:00:00.010 + (1..n)
statusv = rand(0..2, n)
sensorv = take(`SENSOR001`SENSOR002`SENSOR003, n)
insert into sensors values(timev, sensorv, statusv)
n = 6
timev = 2023.01.01T10:00:00.020 + 1 2 3 8 14 20
statusv = rand(0..2, n)
sensorv = take(`SENSOR001`SENSOR002`SENSOR003, n)
insert into sensors values(timev, sensorv, statusv)
select * from output2;
// Output:
print(output2)
time sensor lastStatus
----------------------- --------- ----------
2023.01.01T10:00:00.008 SENSOR003 0
2023.01.01T10:00:00.009 SENSOR001 2
2023.01.01T10:00:00.010 SENSOR002 1
2023.01.01T10:00:00.018 SENSOR003 1
2023.01.01T10:00:00.019 SENSOR001 2
2023.01.01T10:00:00.020 SENSOR002 1
2023.01.01T10:00:00.026 SENSOR001 2
2023.01.01T10:00:00.027 SENSOR002 0
2023.01.01T10:00:00.028 SENSOR003 1
