会话窗口引擎
会话窗口引擎由 createSessionWindowEngine
函数创建。其语法如下:
createSessionWindowEngine(name, sessionGap, metrics,
dummyTable, outputTable, [timeColumn], [useSystemTime=false], [keyColumn],
[updateTime], [useSessionStartTime=true], [snapshotDir],
[snapshotIntervalInMsgCount], [raftGroup], [forceTriggerTime])
createSessionWindowEngine
的参数绝大多数与
createTimeSeriesEngine
一样,只有 sessionGap 和
useSessionStartTime 是它独有的参数。sessionGap
决定了一个会话窗口何时结束,useSessionStartTime 决定了输出表时间列的时间为各个窗口的起始时刻还是结束时刻。
其他参数的详细含义可以参考:createSessionWindowEngine。
计算规则
若某条数据之后,经过 sessionGap 指定的时间长度内,没有新数据到来,就进行一次窗口截断(以截断前最后一条数据的时间戳 + sessionGap 作为窗口的结束时刻)。窗口结束后新到来的一条数据将触发该窗口的计算。
应用例子 1
share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades output1 = table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT]) engine_sw = createSessionWindowEngine(name = "engine_sw", sessionGap = 5, metrics = <sum(volume)>, dummyTable = trades, outputTable = output1, timeColumn = `time, keyColumn=`sym) subscribeTable(tableName="trades", actionName="append_engine_sw", offset=0, handler=append!{engine_sw}, msgAsTable=true) n = 5 timev = 2018.10.12T10:01:00.000 + (1..n) symv=take(`A`B`C,n) volumev = (1..n)%1000 insert into trades values(timev, symv, volumev) n = 5 timev = 2018.10.12T10:01:00.010 + (1..n) volumev = (1..n)%1000 symv=take(`A`B`C,n) insert into trades values(timev, symv, volumev) n = 6 timev = 2018.10.12T10:01:00.020 + 1 2 3 8 14 20 volumev = (1..n)%1000 symv=take(`A`B`C,n) insert into trades values(timev, symv, volumev) select * from output1;
输出返回:
time |
sym |
volume |
---|---|---|
2018.10.12T10:01:00.001 | A | 5 |
2018.10.12T10:01:00.002 | B | 7 |
2018.10.12T10:01:00.003 | C | 3 |
2018.10.12T10:01:00.011 | A | 5 |
2018.10.12T10:01:00.012 | B | 7 |
2018.10.12T10:01:00.013 | C | 3 |
2018.10.12T10:01:00.021 | A | 1 |
2018.10.12T10:01:00.022 | B | 2 |
2018.10.12T10:01:00.023 | C | 3 |
指定 forceTriggerTime 为1000ms,收到最后一条消息后,经过 1000ms,触发所有分组数据计算输出。用以下代码替换上述引擎创建部分的代码。
engine_sw = createSessionWindowEngine(name = "engine_sw", sessionGap = 5, metrics = <sum(volume)>, dummyTable = trades, outputTable = output1, timeColumn = `time, keyColumn=`sym, forceTriggerTime=1000)
再次查询输出表,可以得到以下结果:
time |
sym |
volume |
---|---|---|
2018.10.12T10:01:00.001 | A | 5 |
2018.10.12T10:01:00.002 | B | 7 |
2018.10.12T10:01:00.003 | C | 3 |
2018.10.12T10:01:00.011 | A | 5 |
2018.10.12T10:01:00.012 | B | 7 |
2018.10.12T10:01:00.013 | C | 3 |
2018.10.12T10:01:00.021 | A | 1 |
2018.10.12T10:01:00.022 | B | 2 |
2018.10.12T10:01:00.023 | C | 3 |
2018.10.12T10:01:00.028 | A | 4 |
2018.10.12T10:01:00.034 | B | 5 |
2018.10.12T10:01:00.040 | C | 6 |