createDailyTimeSeriesEngine

Syntax

createDailyTimeSeriesEngine(name, windowSize, step, metrics, dummyTable, outputTable, [timeColumn], [useSystemTime=false], [keyColumn], [garbageSize], [updateTime], [useWindowStartTime], [roundTime=true], [snapshotDir], [snapshotIntervalInMsgCount], [fill='none'], [sessionBegin], [sessionEnd], [mergeSessionEnd=false], [forceTriggerTime], [raftGroup], [forceTriggerSessionEndTime], [keyPurgeFreqInSec=-1], [closed='left'], [outputElapsedMicroseconds=false], [subWindow], [parallelism=1], [acceptedDelay=0], [outputHandler=NULL], [msgAsTable=false], [keyPurgeDaily=true])

Details

This function creates a daily time-series streaming engine. The windowing logic and calculation rules of the daily time-series engine are similar to those of the time-series engine. Features exclusive to the daily time-series engine are listed as follows:

  • Window calculations are performed only within a specified time period (known as a "session") of a calendar day. A day can have multiple sessions, such as 9:00-12:00, 13:00-15:00, and so on.

  • Data that arrives before the start of a session within a calendar day will be included in the calculation of the first window of that session.

  • Data that arrives after the end of the last session of that day will be discarded.

Note: If keyColumn is specified to group data by the column values, the calculations described above will be performed within each group.

For more application scenarios, see Streaming Engines.

Arguments

The daily time-series engine is an extension of the time-series engine (createTimeSeriesEngine) and inherits all of its parameters. In this section, we will only cover the parameters specific to this engine.

sessionBegin (optional) can be a scalar or vector of type SECOND, TIME or NANOTIME corresponding to the data type of the time column, indicating the starting time of each session. If it is a vector, it must be increasing.

sessionEnd (optional) can be a scalar or vector of type SECOND, TIME or NANOTIME corresponding to the data type of the time column, indicating the end time of each session. Specify sessionEnd as 00:00:00 to indicate the beginning of the next day (i.e., 24:00:00 of the current day).

mergeSessionEnd (optional) is a Boolean value. This parameter is only applicable when closed = 'left'. It determines whether the record arriving at the end of a session (which has been adjusted based on the alignment rules) will be included in the calculation of the last window of that session. The default value is false, which means the record will not be included in the last window but will trigger its calculation. If the current session is not the last session of the day, the record will participate in the calculation of the first window of the next session.

forceTriggerSessionEndTime (optional) is a positive integer. The unit of forceTriggerSessionEndTime is consistent with the precision of timeColumn. It indicates the waiting time to force trigger calculation in the window containing the sessionEnd, if it ends without calculation.

If no data is ingested into a group after the last window is calculated, and new data continues to ingest into other groups, the specified fill parameter can be used to fill the results of empty windows of that group. This ensures that the group's windows will still be output at the latest time point. If parameter fill is not specified, no new windows will be generated for that group after the calculation of the last window.

Note: The engine automatically adjusts the starting point of the window for sessionBegin and sessionEnd according to the value of alignmentSize.

acceptedDelay (optional) is a positive integer less than or equal to windowSize, specifying the maximum delay for each window to accept data.The default value is 0.

  • When useSystemTime=true, data received within the acceptedDelay time after the window ends will still be considered part of the current window and participate in the computation, and will not be included in the computation of the next window.

  • When useSystemTime=false, a window with t as right boundary will wait until a record with a timestamp equal to or later than t + acceptedDelay arrives. When such a record arrives, the current window closes and performs a calculation on all records within the window frame. This handles scenarios with out-of-order data.

outputHandler (optional) is a unary function or a partial function with a single unfixed parameter. If set, the engine will not write the calculation results to the output table directly. Instead, the results will be passed as a parameter to the outputHandler function. The default value is NULL, which means the result will be written to the output table.

msgAsTable (optional) is a Boolean scalar indicating whether the output data is passed into function (specified by outputHandler) as a table or as a tuple. If msgAsTable=true, the subscribed data is passed into function as a table. The default value is false, which means the output data is passed into function as a tuple of columns.

keyPurgeDaily (optional) is a Boolean value determining if existing data groups are automatically removed when newer data of a subsequent calendar day is ingested. The default value is true. If set to false, groups of the previous calendar day are retained.

Examples

Example 1:

share streamTable(1000:0, `date`second`sym`volume, [DATE, SECOND, SYMBOL, INT]) as trades
share keyedTable(`time`sym, 10000:0, `time`sym`sumVolume, [DATETIME, SYMBOL, INT]) as output1
engine1 = createDailyTimeSeriesEngine(name="engine1", windowSize=60, step=60, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn=`date`second, useSystemTime=false, keyColumn=`sym, garbageSize=50, updateTime=2, useWindowStartTime=false, sessionBegin=09:30:00 13:00:00, sessionEnd=11:30:00 15:00:00,mergeSessionEnd=true)
subscribeTable(tableName="trades", actionName="engine1", offset=0, handler=append!{engine1}, msgAsTable=true);

insert into trades values(2018.10.08,09:25:31,`A,8)
insert into trades values(2018.10.08,09:26:01,`B,10)
insert into trades values(2018.10.08,09:30:02,`A,26)
insert into trades values(2018.10.08,09:30:10,`B,14)
insert into trades values(2018.10.08,11:29:46,`A,30)
insert into trades values(2018.10.08,11:29:50,`B,11)
insert into trades values(2018.10.08,11:30:00,`A,14)
insert into trades values(2018.10.08,11:30:00,`B,4)
insert into trades values(2018.10.08,13:00:10,`A,16)
insert into trades values(2018.10.08,13:00:12,`B,9)
insert into trades values(2018.10.08,14:59:56,`A,20)
insert into trades values(2018.10.08,14:59:58,`B,20)
insert into trades values(2018.10.08,15:00:00,`A,10)
insert into trades values(2018.10.08,15:00:00,`B,29)

sleep(1000)
select * from output1
time sym sumVolume
2018.10.08T09:31:00 A 34
2018.10.08T09:31:00 B 24
2018.10.08T11:30:00 A 44
2018.10.08T11:30:00 B 15
2018.10.08T13:01:00 A 16
2018.10.08T13:01:00 B 9
2018.10.08T15:00:00 A 30
2018.10.08T15:00:00 B 49

Example 2:

share streamTable(1000:0, `date`second`sym`volume, [DATE, SECOND, SYMBOL, INT]) as trades
share keyedTable(`time`sym, 10000:0, `time`sym`sumVolume, [DATETIME, SYMBOL, INT]) as output1
engine1 = createDailyTimeSeriesEngine(name="engine1", windowSize=60, step=60, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn=`date`second, useSystemTime=false, keyColumn=`sym, garbageSize=50, useWindowStartTime=false, sessionBegin=09:30:00 13:00:00, sessionEnd=11:30:00 15:00:00,mergeSessionEnd=true,forceTriggerSessionEndTime=10)
subscribeTable(tableName="trades", actionName="engine1", offset=0, handler=append!{engine1}, msgAsTable=true);

insert into trades values(date(now()),09:25:31,`A,8)
insert into trades values(date(now()),09:26:01,`B,10)
insert into trades values(date(now()),09:30:02,`A,26)
insert into trades values(date(now()),09:30:10,`B,14)
insert into trades values(date(now()),11:29:46,`A,30)
insert into trades values(date(now()),11:29:50,`B,11)
insert into trades values(date(now()),11:30:00,`B,14)
insert into trades values(date(now()),11:30:01,`A,4)

select * from output1
time sym sumVolume
2022.03.24T09:31:00 A 34
2022.03.24T09:31:00 B 24
2022.03.24T11:30:00 A 30

Set forceTriggerSessionEndTime = 10. Calculation on the window with the right boundary at 11:30:00 will be triggered 10 seconds after the system time reaches 11:30:00.

sleep(10000)
select * from output1
time sym sumVolume
2022.03.24T09:31:00 A 34
2022.03.24T09:31:00 B 24
2022.03.24T11:30:00 A 30
2022.03.24T11:30:00 B 25

Example 3:

Set keyPurgeDaily=false. When the engine receives data of 2024.09.11, it will not remove groups of 2024.09.10.

share streamTable(1000:0, `date`second`sym`volume, [DATE, SECOND, SYMBOL, INT]) as trades
share table(10000:0, `time`sym`sumVolume, [DATETIME, SYMBOL, INT]) as output1
engine1 = createDailyTimeSeriesEngine(name="engine1", windowSize=30*60, step=30*60, 
  metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, 
  timeColumn=`date`second, useSystemTime=false, keyColumn=`sym, garbageSize=50,
  useWindowStartTime=false, sessionBegin=09:30:00 13:00:00, 
  sessionEnd=11:30:00 15:00:00,mergeSessionEnd=true, keyPurgeDaily=false, 
  fill="null", forceTriggerTime=60)

insert into engine1 values(2024.09.10,13:00:10,`A,16)
insert into engine1 values(2024.09.10,13:00:12,`B,9)
insert into engine1 values(2024.09.10,13:00:12,`C,9)
insert into engine1 values(2024.09.10,14:59:56,`A,20)
insert into engine1 values(2024.09.10,14:59:58,`B,20)
insert into engine1 values(2024.09.10,15:00:00,`A,10)
insert into engine1 values(2024.09.10,15:00:00,`B,29)

insert into engine1 values(2024.09.11,09:30:02,`A,26)
insert into engine1 values(2024.09.11,09:30:10,`B,14)
insert into engine1 values(2024.09.11,10:30:46,`A,30)
insert into engine1 values(2024.09.11,10:30:50,`B,11)

select * from output1
time sym sumVolume
2024.09.10T13:30:00 A 16
2024.09.10T13:30:00 B 9
2024.09.10T13:30:00 C 9
2024.09.10T14:00:00 A 11
2024.09.10T14:00:00 B 30
2024.09.10T14:00:00 C 13
2024.09.10T14:30:00 A 20
2024.09.10T14:30:00 B 20
2024.09.10T14:30:00 C 10
2024.09.10T15:00:00 A 30
2024.09.10T15:00:00 B 49
2024.09.10T15:00:00 C
2024.09.11T10:00:00 A 26
2024.09.11T10:00:00 B 14
2024.09.11T10:00:00 C
2024.09.11T10:30:00 A

It can be seen that group C is still present in the results for 2024.09.11, even though the data for this day does not include group C.

Example 4:

If the difference between sessionEnd and sessionBegin cannot be evenly divided by step, the last window of the session will not be output due to insufficient window size. To output the data for this window, you need to set roundTime = false, which will align the window according to the one-minute basis.

// clear variables
dropStreamEngine("engine1")
unsubscribeTable(tableName="trades", actionName="engine1")
undef(`trades, SHARED)
undef(`output1,SHARED)

share streamTable(1000:0, `date`time`sym`volume, [DATE, TIME, SYMBOL, INT]) as trades
share keyedTable(`timestamp`sym, 10000:0, `timestamp`sym`sumVolume, [TIMESTAMP, SYMBOL, INT]) as output1

// Create the engine, specifying a window length of 10 minutes. The last sessionEnd is 14:57:00
engine1 = createDailyTimeSeriesEngine(name="engine1", windowSize=600000, step=600000, 
  metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn=`date`time, garbageSize=50, updateTime=2, 
  useSystemTime=false, keyColumn=`sym,  useWindowStartTime=false, mergeSessionEnd=true,
  sessionBegin=09:30:00.000 13:00:00.000,  sessionEnd=11:30:00.000 14:57:00.000, roundTime=false)
  
subscribeTable(tableName="trades", actionName="engine1", offset=0, 
  handler=append!{engine1}, msgAsTable=true);

// Simulate data insertion into the stream table
// The last data point is at 14:56:00, which will be aligned to the 14:57:00 window
insert into trades values(2024.09.10,14:00:10.988,`A,16)
insert into trades values(2024.09.10,14:00:12.458,`B,9)
insert into trades values(2024.09.10,14:21:10.772,`A,13)
insert into trades values(2024.09.10,14:22:12.090,`B,15)
insert into trades values(2024.09.10,14:29:56.953,`A,20)
insert into trades values(2024.09.10,14:29:58.537,`B,20)
insert into trades values(2024.09.10,14:31:00.612,`A,10)
insert into trades values(2024.09.10,14:56:00.000,`B,29)

sleep(1000)
select * from output1