createDailyTimeSeriesEngine

语法

createDailyTimeSeriesEngine(name, windowSize, step, metrics, dummyTable, outputTable, [timeColumn], [useSystemTime=false], [keyColumn], [garbageSize], [updateTime], [useWindowStartTime], [roundTime=true], [snapshotDir], [snapshotIntervalInMsgCount], [fill='none'], [sessionBegin], [sessionEnd], [mergeSessionEnd=false], [forceTriggerTime], [raftGroup], [forceTriggerSessionEndTime], [keyPurgeFreqInSec], [closed='left'], [outputElapsedMicroseconds=false], [subWindow], [parallelism=1],[acceptedDelay=0], [outputHandler=NULL], [msgAsTable=false])

详情

创建流数据日级时间序列引擎。日级时间序列引擎和时间序列引擎窗口划分和计算规则基本一致,但在此基础上做了如下拓展:

  • 该引擎只能在一个自然日的指定时间段内(以下统称为 session)进行窗口的聚合计算。一个自然日内,可以指定多个 session, 如 9:00-12:00,13:00-15:00 等。
  • 出现在一个 session 开始之前的数据,日级时序引擎规定将该部分数据合入该 session 的第一个窗口进行计算。
  • 当日最后一个 session 后到来的数据将被丢弃,不会计入第二天的第一个窗口中。

若指定了 keyColumn 进行分组,则上述计算将在各分组内独立进行。

更多流数据引擎的应用场景说明可以参考 内置流式计算引擎

参数

该引擎是基于时间序列引擎进行的扩展,继承了 createTimeSeriesEngine 所有的参数,请参照 createTimeSeriesEngine 中参数介绍。这里仅介绍与时间序列引擎不同的参数:

updateTime 为可选参数,正整数。与 createTimeSeriesEngine updateTime 的区别在于,createDailyTimeSeriesEngine 不支持 updateTime = 0。除此之外其它功能相同。

sessionBegin 为可选参数,可以是与时间列的数据类型对应的 SECOND、TIME 或 NANOTIME 类型的标量或向量,表示每个时间段的起始时刻。如果 sessionBegin 是一个向量,它必须是递增的。

sessionEnd 为可选参数,可以是与时间列的数据类型对应的 SECOND、TIME 或 NANOTIME 类型的标量或向量,表示每个时间段的结束时刻。可在 sessionEnd 中指定 00:00:00 表示的次日的零点(即当日的 24:00:00)。

mergeSessionEnd 为可选参数,是一个布尔值。当 close = 'left' 时,表示每个 session 结束时刻(规整后)的数据是否合入最后一个窗口。默认值为 false,此时该条数据不会合入当前 session 的最后一个窗口,但可以触发最后一个窗口的计算;如果当前 session 不是该自然日内最后一个 session,则该数据会合入下个 session 的第一个窗口。

forceTriggerSessionEndTime 可选参数,正整数,单位与 timeColumn 的时间精度一致。若 sessionEnd 时刻对应的窗口数据长时间未发生计算,通过该参数可以设置系统经过多少时间后触发计算并输出。若不指定 fill ,未包含在该窗口内的分组不会输出结果;若指定了 fill ,未包含在该窗口内的分组会按照 fill 指定的方式输出结果。

注: 系统会对数据窗口的起始时间(sessionBeginsessionEnd)进行规整。

例子

share streamTable(1000:0, `date`second`sym`volume, [DATE, SECOND, SYMBOL, INT]) as trades
share keyedTable(`time`sym, 10000:0, `time`sym`sumVolume, [DATETIME, SYMBOL, INT]) as output1
engine1 = createDailyTimeSeriesEngine(name="engine1", windowSize=60, step=60, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn=`date`second, useSystemTime=false, keyColumn=`sym, garbageSize=50, updateTime=2, useWindowStartTime=false, sessionBegin=09:30:00 13:00:00, sessionEnd=11:30:00 15:00:00,mergeSessionEnd=true)
subscribeTable(tableName="trades", actionName="engine1", offset=0, handler=append!{engine1}, msgAsTable=true);

insert into trades values(2018.10.08,09:25:31,`A,8)
insert into trades values(2018.10.08,09:26:01,`B,10)
insert into trades values(2018.10.08,09:30:02,`A,26)
insert into trades values(2018.10.08,09:30:10,`B,14)
insert into trades values(2018.10.08,11:29:46,`A,30)
insert into trades values(2018.10.08,11:29:50,`B,11)
insert into trades values(2018.10.08,11:30:00,`A,14)
insert into trades values(2018.10.08,11:30:00,`B,4)
insert into trades values(2018.10.08,13:00:10,`A,16)
insert into trades values(2018.10.08,13:00:12,`B,9)
insert into trades values(2018.10.08,14:59:56,`A,20)
insert into trades values(2018.10.08,14:59:58,`B,20)
insert into trades values(2018.10.08,15:00:00,`A,10)
insert into trades values(2018.10.08,15:00:00,`B,29)

sleep(1000)
select * from output1
time sym sumVolume
2018.10.08T09:31:00 A 34
2018.10.08T09:31:00 B 24
2018.10.08T11:30:00 A 44
2018.10.08T11:30:00 B 15
2018.10.08T13:01:00 A 16
2018.10.08T13:01:00 B 9
2018.10.08T15:00:00 A 30
2018.10.08T15:00:00 B 49
share streamTable(1000:0, `date`second`sym`volume, [DATE, SECOND, SYMBOL, INT]) as trades
share keyedTable(`time`sym, 10000:0, `time`sym`sumVolume, [DATETIME, SYMBOL, INT]) as output1
engine1 = createDailyTimeSeriesEngine(name="engine1", windowSize=60, step=60, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn=`date`second, useSystemTime=false, keyColumn=`sym, garbageSize=50, useWindowStartTime=false, sessionBegin=09:30:00 13:00:00, sessionEnd=11:30:00 15:00:00,mergeSessionEnd=true,forceTriggerSessionEndTime=10)
subscribeTable(tableName="trades", actionName="engine1", offset=0, handler=append!{engine1}, msgAsTable=true);

insert into trades values(date(now()),09:25:31,`A,8)
insert into trades values(date(now()),09:26:01,`B,10)
insert into trades values(date(now()),09:30:02,`A,26)
insert into trades values(date(now()),09:30:10,`B,14)
insert into trades values(date(now()),11:29:46,`A,30)
insert into trades values(date(now()),11:29:50,`B,11)
insert into trades values(date(now()),11:30:00,`B,14)
insert into trades values(date(now()),11:30:01,`A,4)

select * from output1
time sym sumVolume
2022.03.24T09:31:00 A 34
2022.03.24T09:31:00 B 24
2022.03.24T11:30:00 A 30

设置 forceTriggerSessionEndTime = 10,则系统到达 11:30:00 后,再经过 10s 就会触发右边界为11:30:00的窗口内数据的计算。

sleep(10000)
select * from output1
time sym sumVolume
2022.03.24T09:31:00 A 34
2022.03.24T09:31:00 B 24
2022.03.24T11:30:00 A 30
2022.03.24T11:30:00 B 25