createDailyTimeSeriesEngine
语法
createDailyTimeSeriesEngine(name, windowSize, step,
metrics, dummyTable, outputTable, [timeColumn], [useSystemTime=false],
[keyColumn], [garbageSize], [updateTime], [useWindowStartTime],
[roundTime=true], [snapshotDir], [snapshotIntervalInMsgCount], [fill='none'],
[sessionBegin], [sessionEnd], [mergeSessionEnd=false], [forceTriggerTime],
[raftGroup], [forceTriggerSessionEndTime], [keyPurgeFreqInSec], [closed='left'],
[outputElapsedMicroseconds=false], [subWindow],
[parallelism=1],[acceptedDelay=0], [outputHandler=NULL],
[msgAsTable=false], [keyPurgeDaily=true])
详情
创建流数据日级时间序列引擎。日级时间序列引擎和时间序列引擎窗口划分和计算规则基本一致,但在此基础上做了如下拓展:
- 该引擎只能在一个自然日的指定时间段内(以下统称为 session)进行窗口的聚合计算。一个自然日内,可以指定多个 session, 如 9:00-12:00,13:00-15:00 等。
- 出现在一个 session 开始之前的数据,日级时序引擎规定将该部分数据合入该 session 的第一个窗口进行计算。
- 当日最后一个 session 后到来的数据将被丢弃,不会计入第二天的第一个窗口中。
若指定了 keyColumn 进行分组,则上述计算将在各分组内独立进行。
更多流数据引擎的应用场景说明可以参考 内置流式计算引擎。
参数
该引擎是基于时间序列引擎进行的扩展,继承了 createTimeSeriesEngine
所有的参数,请参照
createTimeSeriesEngine
中参数介绍。这里仅介绍与时间序列引擎不同的参数:
sessionBegin 为可选参数,可以是与时间列的数据类型对应的 SECOND、TIME 或 NANOTIME 类型的标量或向量,表示每个时间段的起始时刻。如果 sessionBegin 是一个向量,它必须是递增的。
sessionEnd 为可选参数,可以是与时间列的数据类型对应的 SECOND、TIME 或 NANOTIME 类型的标量或向量,表示每个时间段的结束时刻。可在 sessionEnd 中指定 00:00:00 表示的次日的零点(即当日的 24:00:00)。
mergeSessionEnd 为可选参数,是一个布尔值。当 close = 'left' 时,表示每个 session 结束时刻(规整后)的数据是否合入最后一个窗口。默认值为 false,此时该条数据不会合入当前 session 的最后一个窗口,但可以触发最后一个窗口的计算;如果当前 session 不是该自然日内最后一个 session,则该数据会合入下个 session 的第一个窗口。
forceTriggerSessionEndTime 可选参数,正整数,单位与 timeColumn 的时间精度一致。若 sessionEnd 时刻对应的窗口数据长时间未发生计算,通过该参数可以设置系统经过多少时间后触发计算并输出。若不指定 fill ,未包含在该窗口内的分组不会输出结果;若指定了 fill ,未包含在该窗口内的分组会按照 fill 指定的方式输出结果。
keyPurgeDaily 为可选参数,是一个布尔值。默认值为 true,表示引擎在收到第一批包含新日期的数据时,先清空之前保存的所有分组,再对这批新数据进行处理。若设置为 false,则引擎不会清理前一天的分组。
例子
例1. 设置 mergeSessionEnd,将每个 session 结束时刻的数据合入最后一个窗口。
share streamTable(1000:0, `date`second`sym`volume, [DATE, SECOND, SYMBOL, INT]) as trades
share keyedTable(`time`sym, 10000:0, `time`sym`sumVolume, [DATETIME, SYMBOL, INT]) as output1
engine1 = createDailyTimeSeriesEngine(name="engine1", windowSize=60, step=60, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn=`date`second, useSystemTime=false, keyColumn=`sym, garbageSize=50, updateTime=2, useWindowStartTime=false, sessionBegin=09:30:00 13:00:00, sessionEnd=11:30:00 15:00:00,mergeSessionEnd=true)
subscribeTable(tableName="trades", actionName="engine1", offset=0, handler=append!{engine1}, msgAsTable=true);
insert into trades values(2018.10.08,09:25:31,`A,8)
insert into trades values(2018.10.08,09:26:01,`B,10)
insert into trades values(2018.10.08,09:30:02,`A,26)
insert into trades values(2018.10.08,09:30:10,`B,14)
insert into trades values(2018.10.08,11:29:46,`A,30)
insert into trades values(2018.10.08,11:29:50,`B,11)
insert into trades values(2018.10.08,11:30:00,`A,14)
insert into trades values(2018.10.08,11:30:00,`B,4)
insert into trades values(2018.10.08,13:00:10,`A,16)
insert into trades values(2018.10.08,13:00:12,`B,9)
insert into trades values(2018.10.08,14:59:56,`A,20)
insert into trades values(2018.10.08,14:59:58,`B,20)
insert into trades values(2018.10.08,15:00:00,`A,10)
insert into trades values(2018.10.08,15:00:00,`B,29)
sleep(1000)
select * from output1
time | sym | sumVolume |
---|---|---|
2018.10.08T09:31:00 | A | 34 |
2018.10.08T09:31:00 | B | 24 |
2018.10.08T11:30:00 | A | 44 |
2018.10.08T11:30:00 | B | 15 |
2018.10.08T13:01:00 | A | 16 |
2018.10.08T13:01:00 | B | 9 |
2018.10.08T15:00:00 | A | 30 |
2018.10.08T15:00:00 | B | 49 |
例2. 设置 forceTriggerSessionEndTime,达到系统时间,强制触发计算。
share streamTable(1000:0, `date`second`sym`volume, [DATE, SECOND, SYMBOL, INT]) as trades
share keyedTable(`time`sym, 10000:0, `time`sym`sumVolume, [DATETIME, SYMBOL, INT]) as output1
engine1 = createDailyTimeSeriesEngine(name="engine1", windowSize=60, step=60, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn=`date`second, useSystemTime=false, keyColumn=`sym, garbageSize=50, useWindowStartTime=false, sessionBegin=09:30:00 13:00:00, sessionEnd=11:30:00 15:00:00,mergeSessionEnd=true,forceTriggerSessionEndTime=10)
subscribeTable(tableName="trades", actionName="engine1", offset=0, handler=append!{engine1}, msgAsTable=true);
insert into trades values(date(now()),09:25:31,`A,8)
insert into trades values(date(now()),09:26:01,`B,10)
insert into trades values(date(now()),09:30:02,`A,26)
insert into trades values(date(now()),09:30:10,`B,14)
insert into trades values(date(now()),11:29:46,`A,30)
insert into trades values(date(now()),11:29:50,`B,11)
insert into trades values(date(now()),11:30:00,`B,14)
insert into trades values(date(now()),11:30:01,`A,4)
select * from output1
time | sym | sumVolume |
---|---|---|
2022.03.24T09:31:00 | A | 34 |
2022.03.24T09:31:00 | B | 24 |
2022.03.24T11:30:00 | A | 30 |
设置 forceTriggerSessionEndTime = 10,则系统到达 11:30:00 后,再经过 10s 就会触发右边界为11:30:00的窗口内数据的计算。
sleep(10000)
select * from output1
time | sym | sumVolume |
---|---|---|
2022.03.24T09:31:00 | A | 34 |
2022.03.24T09:31:00 | B | 24 |
2022.03.24T11:30:00 | A | 30 |
2022.03.24T11:30:00 | B | 25 |
// 清理变量
dropStreamEngine("engine1")
unsubscribeTable(tableName="trades", actionName="engine1")
undef(`trades, SHARED)
undef(`output1,SHARED)
share streamTable(1000:0, `date`time`sym`volume, [DATE, TIME, SYMBOL, INT]) as trades
share keyedTable(`timestamp`sym, 10000:0, `timestamp`sym`sumVolume, [TIMESTAMP, SYMBOL, INT]) as output1
//创建引擎,指定窗口长度为 10 分钟。最后一个 sessionEnd 是 14:57:00
engine1 = createDailyTimeSeriesEngine(name="engine1", windowSize=600000, step=600000,
metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn=`date`time, garbageSize=50, updateTime=2,
useSystemTime=false, keyColumn=`sym, useWindowStartTime=false, mergeSessionEnd=true,
sessionBegin=09:30:00.000 13:00:00.000, sessionEnd=11:30:00.000 14:57:00.000, roundTime=false)
subscribeTable(tableName="trades", actionName="engine1", offset=0,
handler=append!{engine1}, msgAsTable=true);
// 模拟数据插入流表。
// 最后一条数据时间是 14:56:00,按照一分钟规则进行规整,则该条数据将输出到 14:57:00 的窗口中
insert into trades values(2024.09.10,14:00:10.988,`A,16)
insert into trades values(2024.09.10,14:00:12.458,`B,9)
insert into trades values(2024.09.10,14:21:10.772,`A,13)
insert into trades values(2024.09.10,14:22:12.090,`B,15)
insert into trades values(2024.09.10,14:29:56.953,`A,20)
insert into trades values(2024.09.10,14:29:58.537,`B,20)
insert into trades values(2024.09.10,14:31:00.612,`A,10)
insert into trades values(2024.09.10,14:56:00.000,`B,29)
sleep(1000)
select * from output1
timestamp | sym | sumVolume |
---|---|---|
2024.09.10T14:10:00.000 | A | 16 |
2024.09.10T14:10:00.000 | B | 9 |
2024.09.10T14:30:00.000 | A | 33 |
2024.09.10T14:30:00.000 | B | 35 |
2024.09.10T14:40:00.000 | A | 10 |
2024.09.10T14:57:00.000 | B | 29 |
例4.
设置 keyPurgeDaily=false, 则在收到第一批日期是 2024.09.11 的数据时,引擎不会清空日期是 2024.09.10 的分组数据。
share streamTable(1000:0, `date`second`sym`volume, [DATE, SECOND, SYMBOL, INT]) as trades
share table(10000:0, `time`sym`sumVolume, [DATETIME, SYMBOL, INT]) as output1
engine1 = createDailyTimeSeriesEngine(name="engine1", windowSize=30*60, step=30*60, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn=`date`second, useSystemTime=false, keyColumn=`sym, garbageSize=50, useWindowStartTime=false, sessionBegin=09:30:00 13:00:00, sessionEnd=11:30:00 15:00:00,mergeSessionEnd=true, keyPurgeDaily=false, fill="null", forceTriggerTime=60)
insert into engine1 values(2024.09.10,13:00:10,`A,16)
insert into engine1 values(2024.09.10,13:00:12,`B,9)
insert into engine1 values(2024.09.10,13:00:12,`C,9)
insert into engine1 values(2024.09.10,14:59:56,`A,20)
insert into engine1 values(2024.09.10,14:59:58,`B,20)
insert into engine1 values(2024.09.10,15:00:00,`A,10)
insert into engine1 values(2024.09.10,15:00:00,`B,29)
insert into engine1 values(2024.09.11,09:30:02,`A,26)
insert into engine1 values(2024.09.11,09:30:10,`B,14)
insert into engine1 values(2024.09.11,10:30:46,`A,30)
insert into engine1 values(2024.09.11,10:30:50,`B,11)
select * from output1
time |
sym |
sumVolume |
---|---|---|
2024.09.10T13:30:00 | A | 16 |
2024.09.10T13:30:00 | B | 9 |
2024.09.10T13:30:00 | C | 9 |
2024.09.10T14:00:00 | A | 11 |
2024.09.10T14:00:00 | B | 30 |
2024.09.10T14:00:00 | C | 13 |
2024.09.10T14:30:00 | A | 20 |
2024.09.10T14:30:00 | B | 20 |
2024.09.10T14:30:00 | C | 10 |
2024.09.10T15:00:00 | A | 30 |
2024.09.10T15:00:00 | B | 49 |
2024.09.10T15:00:00 | C | |
2024.09.11T10:00:00 | A | 26 |
2024.09.11T10:00:00 | B | 14 |
2024.09.11T10:00:00 | C | |
2024.09.11T10:30:00 | A |
从上表的结果可以看出,2024.09.11 的数据中未包含 C 分组,但由于引擎未删除 2024.09.10 中的 C 分组,,因此填充结果中仍然出现了 C 分组。