createDailyTimeSeriesEngine

Syntax

createDailyTimeSeriesEngine(name, windowSize, step, metrics, dummyTable, outputTable, [timeColumn], [useSystemTime=false], [keyColumn], [garbageSize], [updateTime], [useWindowStartTime], [roundTime=true], [snapshotDir], [snapshotIntervalInMsgCount], [fill='none'], [sessionBegin], [sessionEnd], [mergeSessionEnd=false], [forceTriggerTime], [raftGroup], [forceTriggerSessionEndTime], [keyPurgeFreqInSec=-1], [closed='left'], [outputElapsedMicroseconds=false], [subWindow], [parallelism=1], [acceptedDelay=0], [outputHandler=NULL], [msgAsTable=false], [keyPurgeDaily=true], [mergeLastWindow=false])

Details

This function creates a daily time-series streaming engine. The windowing logic and calculation rules of the daily time-series engine are similar to those of the time-series engine. Features exclusive to the daily time-series engine are listed as follows:

  • Window calculations are performed only within a specified time period (known as a "session") of a calendar day. A day can have multiple sessions, such as 9:00-12:00, 13:00-15:00, and so on. A session’s start time and end time are automatically aligned unless mergeLastWindow is specified.

  • Data that arrives before the start of a session within a calendar day will be included in the calculation of the first window of that session.

  • Data that arrives after the end of the last session of that day will be discarded.

Note: If keyColumn is specified to group data by the column values, the calculations described above will be performed within each group.

For more application scenarios, see Streaming Engines.

Arguments

The daily time-series engine is an extension of the time-series engine (createTimeSeriesEngine) and inherits all of its parameters. In this section, we will only cover the parameters specific to this engine.

sessionBegin (optional) can be a scalar or vector of type SECOND, TIME or NANOTIME corresponding to the data type of the time column, indicating the starting time of each session. If it is a vector, it must be increasing.

sessionEnd (optional) can be a scalar or vector of type SECOND, TIME or NANOTIME corresponding to the data type of the time column, indicating the end time of each session. Specify sessionEnd as 00:00:00 to indicate the beginning of the next day (i.e., 24:00:00 of the current day).

mergeSessionEnd (optional) is a Boolean value. This parameter is only applicable when closed = 'left'. It determines whether the record arriving at the end of a session will be included in the calculation of the last window of that session. The default value is false, which means the record will not be included in the last window but will trigger its calculation. If the current session is not the last session of the day, the record will participate in the calculation of the first window of the next session.

forceTriggerSessionEndTime (optional) is a positive integer. The unit of forceTriggerSessionEndTime is consistent with the precision of timeColumn. It indicates the waiting time to force trigger calculation in the window containing the sessionEnd, if it ends without calculation.

If no data is ingested into a group after the last window is calculated, and new data continues to ingest into other groups, the specified fill parameter can be used to fill the results of empty windows of that group. This ensures that the group's windows will still be output at the latest time point. If parameter fill is not specified, no new windows will be generated for that group after the calculation of the last window.

keyPurgeDaily (optional) is a Boolean value determining if existing data groups are automatically removed when newer data of a subsequent calendar day is ingested. The default value is true. If set to false, groups of the previous calendar day are retained.

mergeLastWindow (optional) is a boolean value which defaults to false. It handles session periods that can’t be divided into equal-length windows (as specified by windowSize). When set to true, the engine preserves the original session start and end time (defined by sessionBegin and sessionEnd) and merges any data from the last incomplete window (smaller than windowSize) into the preceding window for calculation. This parameter cannot be used simultaneously with subWindow.

Examples

Example 1:

share streamTable(1000:0, `date`second`sym`volume, [DATE, SECOND, SYMBOL, INT]) as trades
share keyedTable(`time`sym, 10000:0, `time`sym`sumVolume, [DATETIME, SYMBOL, INT]) as output1
engine1 = createDailyTimeSeriesEngine(name="engine1", windowSize=60, step=60, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn=`date`second, useSystemTime=false, keyColumn=`sym, garbageSize=50, updateTime=2, useWindowStartTime=false, sessionBegin=09:30:00 13:00:00, sessionEnd=11:30:00 15:00:00,mergeSessionEnd=true)
subscribeTable(tableName="trades", actionName="engine1", offset=0, handler=append!{engine1}, msgAsTable=true);

insert into trades values(2018.10.08,09:25:31,`A,8)
insert into trades values(2018.10.08,09:26:01,`B,10)
insert into trades values(2018.10.08,09:30:02,`A,26)
insert into trades values(2018.10.08,09:30:10,`B,14)
insert into trades values(2018.10.08,11:29:46,`A,30)
insert into trades values(2018.10.08,11:29:50,`B,11)
insert into trades values(2018.10.08,11:30:00,`A,14)
insert into trades values(2018.10.08,11:30:00,`B,4)
insert into trades values(2018.10.08,13:00:10,`A,16)
insert into trades values(2018.10.08,13:00:12,`B,9)
insert into trades values(2018.10.08,14:59:56,`A,20)
insert into trades values(2018.10.08,14:59:58,`B,20)
insert into trades values(2018.10.08,15:00:00,`A,10)
insert into trades values(2018.10.08,15:00:00,`B,29)

sleep(1000)
select * from output1
time sym sumVolume
2018.10.08T09:31:00 A 34
2018.10.08T09:31:00 B 24
2018.10.08T11:30:00 A 44
2018.10.08T11:30:00 B 15
2018.10.08T13:01:00 A 16
2018.10.08T13:01:00 B 9
2018.10.08T15:00:00 A 30
2018.10.08T15:00:00 B 49

Example 2:

share streamTable(1000:0, `date`second`sym`volume, [DATE, SECOND, SYMBOL, INT]) as trades
share keyedTable(`time`sym, 10000:0, `time`sym`sumVolume, [DATETIME, SYMBOL, INT]) as output1
engine1 = createDailyTimeSeriesEngine(name="engine1", windowSize=60, step=60, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn=`date`second, useSystemTime=false, keyColumn=`sym, garbageSize=50, useWindowStartTime=false, sessionBegin=09:30:00 13:00:00, sessionEnd=11:30:00 15:00:00,mergeSessionEnd=true,forceTriggerSessionEndTime=10)
subscribeTable(tableName="trades", actionName="engine1", offset=0, handler=append!{engine1}, msgAsTable=true);

insert into trades values(date(now()),09:25:31,`A,8)
insert into trades values(date(now()),09:26:01,`B,10)
insert into trades values(date(now()),09:30:02,`A,26)
insert into trades values(date(now()),09:30:10,`B,14)
insert into trades values(date(now()),11:29:46,`A,30)
insert into trades values(date(now()),11:29:50,`B,11)
insert into trades values(date(now()),11:30:00,`B,14)
insert into trades values(date(now()),11:30:01,`A,4)

select * from output1
time sym sumVolume
2022.03.24T09:31:00 A 34
2022.03.24T09:31:00 B 24
2022.03.24T11:30:00 A 30

Set forceTriggerSessionEndTime = 10. Calculation on the window with the right boundary at 11:30:00 will be triggered 10 seconds after the system time reaches 11:30:00.

sleep(10000)
select * from output1
time sym sumVolume
2022.03.24T09:31:00 A 34
2022.03.24T09:31:00 B 24
2022.03.24T11:30:00 A 30
2022.03.24T11:30:00 B 25

Example 3:

Set keyPurgeDaily=false. When the engine receives data of 2024.09.11, it will not remove groups of 2024.09.10.

share streamTable(1000:0, `date`second`sym`volume, [DATE, SECOND, SYMBOL, INT]) as trades
share table(10000:0, `time`sym`sumVolume, [DATETIME, SYMBOL, INT]) as output1
engine1 = createDailyTimeSeriesEngine(name="engine1", windowSize=30*60, step=30*60, 
  metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, 
  timeColumn=`date`second, useSystemTime=false, keyColumn=`sym, garbageSize=50,
  useWindowStartTime=false, sessionBegin=09:30:00 13:00:00, 
  sessionEnd=11:30:00 15:00:00,mergeSessionEnd=true, keyPurgeDaily=false, 
  fill="null", forceTriggerTime=60)

insert into engine1 values(2024.09.10,13:00:10,`A,16)
insert into engine1 values(2024.09.10,13:00:12,`B,9)
insert into engine1 values(2024.09.10,13:00:12,`C,9)
insert into engine1 values(2024.09.10,14:59:56,`A,20)
insert into engine1 values(2024.09.10,14:59:58,`B,20)
insert into engine1 values(2024.09.10,15:00:00,`A,10)
insert into engine1 values(2024.09.10,15:00:00,`B,29)

insert into engine1 values(2024.09.11,09:30:02,`A,26)
insert into engine1 values(2024.09.11,09:30:10,`B,14)
insert into engine1 values(2024.09.11,10:30:46,`A,30)
insert into engine1 values(2024.09.11,10:30:50,`B,11)

select * from output1
time sym sumVolume
2024.09.10T13:30:00 A 16
2024.09.10T13:30:00 B 9
2024.09.10T13:30:00 C 9
2024.09.10T14:00:00 A 11
2024.09.10T14:00:00 B 30
2024.09.10T14:00:00 C 13
2024.09.10T14:30:00 A 20
2024.09.10T14:30:00 B 20
2024.09.10T14:30:00 C 10
2024.09.10T15:00:00 A 30
2024.09.10T15:00:00 B 49
2024.09.10T15:00:00 C
2024.09.11T10:00:00 A 26
2024.09.11T10:00:00 B 14
2024.09.11T10:00:00 C
2024.09.11T10:30:00 A

It can be seen that group C is still present in the results for 2024.09.11, even though the data for this day does not include group C.

Example 4:

If the difference between sessionEnd and sessionBegin cannot be evenly divided by step, the last window of the session will not be output due to insufficient window size. To output the data for this window, you need to set roundTime = false, which will align the window according to the one-minute basis.

// clear variables
dropStreamEngine("engine1")
unsubscribeTable(tableName="trades", actionName="engine1")
undef(`trades, SHARED)
undef(`output1,SHARED)

share streamTable(1000:0, `date`time`sym`volume, [DATE, TIME, SYMBOL, INT]) as trades
share keyedTable(`timestamp`sym, 10000:0, `timestamp`sym`sumVolume, [TIMESTAMP, SYMBOL, INT]) as output1

// Create the engine, specifying a window length of 10 minutes. The last sessionEnd is 14:57:00
engine1 = createDailyTimeSeriesEngine(name="engine1", windowSize=600000, step=600000, 
  metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn=`date`time, garbageSize=50, updateTime=2, 
  useSystemTime=false, keyColumn=`sym,  useWindowStartTime=false, mergeSessionEnd=true,
  sessionBegin=09:30:00.000 13:00:00.000,  sessionEnd=11:30:00.000 14:57:00.000, roundTime=false)
  
subscribeTable(tableName="trades", actionName="engine1", offset=0, 
  handler=append!{engine1}, msgAsTable=true);

// Simulate data insertion into the stream table
// The last data point is at 14:56:00, which will be aligned to the 14:57:00 window
insert into trades values(2024.09.10,14:00:10.988,`A,16)
insert into trades values(2024.09.10,14:00:12.458,`B,9)
insert into trades values(2024.09.10,14:21:10.772,`A,13)
insert into trades values(2024.09.10,14:22:12.090,`B,15)
insert into trades values(2024.09.10,14:29:56.953,`A,20)
insert into trades values(2024.09.10,14:29:58.537,`B,20)
insert into trades values(2024.09.10,14:31:00.612,`A,10)
insert into trades values(2024.09.10,14:56:00.000,`B,29)

sleep(1000)
select * from output1

Example 5:

Set the time of the second session to 13:00:00-15:00:30, and set mergeLastWindow to true. The start time and end time of the session will not be aligned, and the data from the last incomplete window [15:00:00, 15:00:30) will join the previous window [14:59:00, 15:00:00), forming a [14:59:00,15:00:30) window for calculation.

dropStreamEngine("engine1")
unsubscribeTable(tableName="trades", actionName="engine1")
undef(`trades, SHARED)
undef(`output1,SHARED)

share streamTable(1000:0, `date`second`sym`volume, [DATE, SECOND, SYMBOL, INT]) as trades
share keyedTable(`time`sym, 10000:0, `time`sym`sumVolume, [DATETIME, SYMBOL, INT]) as output1
engine1 = createDailyTimeSeriesEngine(name="engine1", windowSize=60, step=60, timeColumn=`date`second,
  metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1,  
  useSystemTime=false, keyColumn=`sym, garbageSize=50, 
  useWindowStartTime=false, sessionBegin=09:00:00 13:00:00, 
  sessionEnd=11:30:00 15:00:30, roundTime=false, mergeLastWindow=true)
subscribeTable(tableName="trades", actionName="engine1", offset=0, 
  handler=append!{engine1}, msgAsTable=true);

insert into trades values(2018.10.08,13:00:10,`A,16)
insert into trades values(2018.10.08,13:00:12,`B,9)
insert into trades values(2018.10.08,14:29:56,`A,20)
insert into trades values(2018.10.08,14:29:58,`B,20)
insert into trades values(2018.10.08,14:31:00,`A,10)
insert into trades values(2018.10.08,14:55:00,`B,29)
insert into trades values(2018.10.08,14:56:00,`B,29)
insert into trades values(2018.10.08,14:57:01,`A,29)
insert into trades values(2018.10.08,14:57:01,`B,29)
insert into trades values(2018.10.08,14:59:01,`B,29)
insert into trades values(2018.10.08,14:59:01,`A,29)
insert into trades values(2018.10.08,15:00:01,`B,29)
insert into trades values(2018.10.08,15:00:01,`A,29)
insert into trades values(2018.10.08,15:00:31,`B,29)
insert into trades values(2018.10.08,15:00:31,`A,29)
sleep(2000)
select * from output1
Time Symbol Sum Volume
2018.10.08T13:01:00 A 16
2018.10.08T13:01:00 B 9
2018.10.08T14:30:00 A 20
2018.10.08T14:30:00 B 20
2018.10.08T14:56:00 B 29
2018.10.08T14:32:00 A 10
2018.10.08T14:57:00 B 29
2018.10.08T14:58:00 A 29
2018.10.08T14:58:00 B 29
2018.10.08T15:00:30 A 58
2018.10.08T15:00:30 B 58