createTimeSeriesEngine

Syntax

createTimeSeriesEngine(name, windowSize, step, metrics, dummyTable, outputTable, [timeColumn], [useSystemTime=false], [keyColumn], [garbageSize], [updateTime], [useWindowStartTime], [roundTime=true], [snapshotDir], [snapshotIntervalInMsgCount], [fill='none'], [forceTriggerTime], [raftGroup], [keyPurgeFreqInSec=-1], [closed='left'], [outputElapsedMicroseconds=false], [subWindow], [parallelism=1], [acceptedDelay=0], [outputHandler=NULL], [msgAsTable=false])

Alias: createTimeSeriesAggregator

Details

This function creates a time-series streaming engine to conduct real-time time-series calculations with moving windows, and returns a table object where data is ingested for window calculations.

There are two types of aggregate operators in the time-series engine: incremental operators and full operators. Incremental operators incrementally aggregate the data as they arrive without keeping the historical data. Full operators (e.g., user-defined aggregate functions, unoptimized built-in aggregate functions, or functions with nested state functions) keep all the data in a window and recompute the output as a full refresh whenever new data arrives.

The following aggregate operators in the time-series engine are optimized for incremental computations: corr, covar, first, last, max, med, min, percentile, quantile, std, var, sum, sum2, sum3, sum4, wavg, wsum, count, firstNot, ifirstNot, lastNot, ilastNot, imax, imin, nunique, prod, sem, mode, searchK, beta, avg.

For more application scenarios, see Streaming Engines.

Windowing logic

Window boundaries: The engine automatically adjusts the starting point of the first window. (see parameter description for step and roundTime, and alignment rules).

Window properties:

  • windowSize - the size of each window;

  • closed - whether the left/right boundaries of a window is inclusive/exclusive;

  • step - the duration of time between windows;

  • useSystemTime specifies how values are windowed - based on the time column in the data or the system time of data ingestion.

Calculation Rules

  • If timeColumn is specified, its values must be increasing. If keyColumn is specified to group the data, the values in timeColumn must be increasing with each group specified by keyColumn. Otherwise, out-of-order data will be discarded.
  • If useSystemTime = true, the calculation of a window is triggered as soon as the window ends. If useSystemTime = false (with timeColumn specified), the calculation of a window is triggered by the arrival of the next record after the window ends. To trigger the calculation for the uncalculated windows, you can specify the parameter updateTime or forceTriggerTime.
  • If fill is unspecified or "None", only windows with calculation results are output. If fill is specified, all windows are output, and the empty windows are filled using the specified filling method.
  • Since version 2.00.11, if updateTime = 0, incoming records in the current window can be immediately calculated and output.

Other Features

  • Data/state cleanup: You can set a cleanup rule to clear historical data. (See parameters keyPurgeFilter and keyPurgeFreInSecond)

  • Snapshot: Snapshot mechanism is used to restore the streaming engine to the latest snapshot after system interruption. (See parameters snapshotDir and snapshotIntervalInMsgCount)

  • High availability: To enable high availability for streaming engines, specify the parameter raftGroup on the leader of the raft group on the subscriber. When a leader is down, the raft group automatically switches to a new leader to resubscribe to the stream table.

Arguments

name is a string indicating the name of the engine. It is the only identifier of an engine on a data or compute node. It can have letter, number and "_" and must start with a letter.

windowSize is a scalar or vector with positive integers that specifies the size of the windows for calculation.

step is a positive integer indicating how much each window moves forward relative to the previous one. Note that step must be divisible by windowSize, otherwise an exception will be thrown.

The unit of windowSize and step are determined by the value of useSystemTime.

  • If useSystemTime =true, the unit of windowSize and step is millisecond.

  • If useSystemTime =false, the unit of windowSize and step is the same as the unit of timeColumn.

metrics is metacode or a tuple specifying the calculation formulas. For more information about metacode please refer to Metaprogramming.

  • It can use one or more built-in or user-defined aggregate functions (which must be defined by the defg keyword) such as <[sum(volume), avg(price)]>, or expressions of aggregate functions such as as <[avg(price1)-avg(price2)]>, or aggregate functions involving multiple columns such as <[std(price1-price2)]>.

  • You can specify functions that return multiple values for metrics, such as <func(price) as `col1`col2> (it's optional to specify the column names).

  • If metrics is a tuple with multiple formulas, windowSize is specified as a vector of the same length as metrics. Each element of windowSize corresponds to the elements in metrics. For example, if windowSize=[10,20], metrics can be (<[min(volume), max(volume)]>, <sum(volume)>). metrics can also input nested tuple vectors, such as [[<[min(volume), max(volume)]>, <sum(volume)>], [<avg(volume)>]].

  • The column names specified in metrics are not case-sensitive and can be inconsistent with the column names of the input tables.

dummyTable is a table object whose schema must be the same as the subscribed stream table. Whether dummyTable contains data does not matter.

outputTable is a table to which the engine inserts calculation results. It can be an in-memory table or a DFS table. Create an empty table and specify the column names and types before calling the function.

The output columns are in the following order:

(1) The first column must be a time column.

  • If useSystemTime = true, it is TIMESTAMP; Otherwise, it has the same data type as timeColumn;
  • If useWindowStartTime = true, the column displays the start time of each window; Otherwise, it displays the end time of each window.

(2) If keyColumn is specified, the subsequent column(s) must be in the same order as that specified by keyColumn.

(3) If outputElapsedMicroseconds is set to true, you need to specify a column of LONG type. See the outputElapsedMicroseconds parameter for details.

(4) Then followed by one or more result columns.

Note: Starting from version 2.00.10, the engine supports a user-defined aggregate function to merge multiple results in an array vector column. The result column in the output table must be in the form of array vector (i.e., append a pair of square brackets ([]) to the data type). See example 3.

timeColumn (optional) is a STRING scalar or vector specifying the time column(s) of the subscribed stream table. When useSystemTime = false, it must be specified.

Note: If timeColumn is a vector, it must have a date element (of DATE type) and a time element (of TIME, SECOND or NANOTIME type). In this case, the first column in outputTable must take the data type of concatDateTime(date, time).

useSystemTime (optional) is a Boolean value indicating whether the calculations are performed based on the system time when data is ingested into the engine.

  • useSystemTime = true: the engine will regularly window the streaming data at fixed time intervals for calculations according to the ingestion time (local system time with millisecond precision, independent of any temporal columns in the streaming table) of each record. As long as a window contains data, the calculation will be performed automatically when the window ends. The first column in output table indicates the timestamp when the calculation occurred.

  • useSystemTime = false (default): the engine will window the streaming data according to the timeColumn in the stream table. The calculation for a window is triggered by the first record after the previous window. Note that the record which triggers the calculation will not participate in this calculation.

For example, there is a window ranges from 10:10:10 to 10:10:19. If useSystemTime = true and the window is not empty, the calculation will be triggered at 10:10:20. If useSystemTime = false and the first record after 10:10:19 is at 10:10:25, the calculation will be triggered at 10:10:25.

keyColumn (optional) is a STRING scalar/vector indicating the name of the grouping column(s). If it is specified, the engine conducts the calculations within each group. For example, group the data by stock symbol and apply moving aggregation functions to each stock.

garbageSize (optional) is a positive integer. The default value is 50,000 (rows). The subscribed data continues to accumulate in the engine. When the number of rows of historical data in the memory exceeds garbageSize, the system will clear the historical data that is no longer needed.

Note: For incremental operators, the data no longer needed will be automatically removed from memory. However, for full operators, this parameter must be specified to enable garbage collection.

updateTime (optional) is a non-negative integer which takes the same time precision as timeColumn. It is used to trigger window calculations at an interval shorter than step. step must be a multiple of updateTime. To specify updateTime, useSystemTime must be set to false.

If updateTime is not specified, calculation for a window will not occur before the window ends. By specifying updateTime, you can calculate the values for several times in an window of which calculation hasn't been triggered for a long time.

The calculations within a window are triggered with the following rules:

  • Starting from the left boundary of the window, if there is a new record arriving after every updateTime, all data before this record in the current window is calculated. If it still has unprocessed data after 2*updateTime (at least 2 seconds), all data in this window is calculated.

  • If keyColumn is specified, these rules apply within each group.

It is recommended to specify a keyed table for outputTable if updateTime is set. If outputTable is a standard in-memory table or stream table, it will have multiple results for each timestamp (in each group). It is not recommended to use a keyed stream table either as the records of a keyed stream table cannot be updated.

useWindowStartTime (optional) is a Boolean value indicating whether the time column in outputTable is the starting time of the windows. The default value is false, which means the timestamps in the output table are the end time of the windows. If the windowSize is a vector, useWindowStartTime must be false.

roundTime (optional) is a Boolean value indicating the method to align the window boundary if the time precision is milliseconds or seconds and step is bigger than one minute. The default value is true indicating the alignment is based on the multi-minute rule (see the alignment rules). False means alignment is based on the one-minute rule.

snapshotDir (optional) is a string indicating the directory where the streaming engine snapshot is saved. The directory must already exist, otherwise an exception is thrown. If snapshotDir is specified, the system checks whether a snapshot already exists in the directory when creating a streaming engine. If it exists, the snapshot will be loaded to restore the engine state. Multiple streaming engines can share a directory where the snapshot files are named as the engine names.

The file extension of a snapshot can be:

    • <engineName>.tmp: temporary snapshot

    • <engineName>.snapshot: a snapshot that is generated and flushed to disk

    • <engineName>.old: if a snapshot with the same name already exists, the previous snapshot is renamed to <engineName>.old.

snapshotIntervalInMsgCount (optional) is a positive integer indicating the number of messages to receive before the next snapshot is saved.

fill (optional) is a vector/scalar indicating the filling method to deal with an empty window (in a group). It can be:

  • 'none': no result

  • 'null': output a NULL value.

  • 'ffill': output the result in the last window.

  • specific value: output the specified value. Its type should be the same as metrics output's type.

fill could be a vector to specify different filling method for each metric. The size of the vector must be consistent with the number of elements specified in metrics. The element in vector cannot be 'none'.

forceTriggerTime (optional) is a non-negative integer which takes the same time precision as timeColumn, indicating the waiting time to force trigger calculation in the uncalculated windows for each group. If forceTriggerTime is set, useSystemTime must be false and updateTime cannot be specified.

The rules are as follow:

(1) Suppose the end time of the uncalculated window is t, and an incoming record of another group arrives at t1: when t1-t>=forceTriggerTime, calculation of the window will be triggered.

(2) If no data is ingested into a group after the last window is calculated, and new data continues to ingest into other groups, the specified fill parameter can be used to fill results for empty windows of that group. The group's windows will still be output at the latest time point. If parameter fill is not specified, no new windows will be generated for that group after the last window has been triggered for computation.

Note the following points when setting forceTriggerTime or updateTime:

  • If updateTime is specified, the result of the current window calculation will be updated again when data belonging to the current window still arrives after the calculation is triggered.

  • If forceTriggerTime is specified, the incoming data with a timestamp within the current window will be discarded after the calculation is forced to be triggered.

raftGroup (optional) is an integer greater than 1, indicating ID of the raft group on the high-availability streaming subscriber specified by the configuration parameter streamingRaftGroups. Specify raftGroup to enable high availability on the streaming engine. When an engine is created on the leader, it is also created on each follower and the engine snapshot is synchronized to the followers. When a leader is down, the raft group automatically switches to a new leader to resubscribe to the stream table. Note that SnapShotDir must also be specified when specifying a raft group.

keyPurgeFreqInSec (optional) is a positive integer indicating the interval (in seconds) to remove groups with no incoming data for a long time. If a group has no incoming data for at least keyPurgeFreqInSec seconds after the last time of data purging, it will be removed.

Note: To specify this parameter, parameter forceTriggerTime must be specified and parameter fill cannot be specified.

You can check the number of groups in a time-series streaming engine based on the column "numGroups" returned by getStreamEngineStat.

closed (optional) is a STRING indicating whether the left or the right boundary is included.

  • closed = 'left': left-closed, right-open

  • closed = 'right': left-open, right-closed

outputElapsedMicroseconds (optional) is a BOOLEAN value. The default value is false. It determines whether to output the elapsed time (in microseconds) from the time the calculation is triggered to the output of result for each window.

subWindow (optional)is a pair of integers or DURATION values, indicating the range of the subwindow within the window specified by windowSize. If specified, only results calculated within subwindows will be returned and the time column of the output table displays the end time of each subwindow. The calculation of the subwindow will be triggered by the arrival of the next record after the subwindow ends. The boundary of the subwindow is determined by parameter closed. When subWindow is a pair of integers, it takes the same time precision as timeColumn.

If it is specified, note that:

  • windowSize must be equal to step.

  • Shall not specify updateTime>0 and useSystemTime=true.

parallelism (optional) is a positive integer no greater than 63, representing the number of worker threads for parallel computation. The default value is 1. For compute-intensive workloads, adjusting this parameter appropriately can effectively utilize computing resources and reduce computation time. It is recommended to set a value less than the number of CPU cores, normally from 4 to 8.

acceptedDelay (optional) is a positive integer smaller than windowSize. It only takes effect when useSystemTime=true, specifying the maximum delay for each window to accept data, with a default value of 0. Data received within the acceptedDelay time after the window ends will still be considered part of the current window and participate in the computation, and will not be included in the computation of the next window.

outputHandler (optional) is a unary function or a partial function with a single unfixed parameter. If set, the engine will not write the calculation results to the output table directly. Instead, the results will be passed as a parameter to the outputHandler function. The default value is NULL, which means the result will be written to the output table.

msgAsTable (optional) is a Boolean scalar indicating whether the output data is passed into function (specified by outputHandler) as a table or as a tuple. If msgAsTable=true, the subscribed data is passed into function as a table. The default value is false, which means the output data is passed into function as a tuple of columns.

Alignment Rules

To facilitate observation and comparison of calculation results, the engine automatically adjusts the starting point of the first window. The alignment size (integer) will be decided by parameter step, roundTime, and the precision of timeColumn. When time series engine calculates within groups, all groups' windows will be uniformly aligned. The boundaries of each window are the same for each group.

  • If the data type of timeColumn is MINUTE(HH:mm), the value of alignmentSize is as follows:

    if roundTime=false:

    step alignmentSize
    0~2 2
    3 3
    4~5 5
    6~10 10
    11~15 15
    16~20 20
    21~30 30
    >30 60 (1 hour)

    if roundTime=true:

    The value of alignmentSize is same as above table if step<=30; The value of alignmentSize is as folllows if step>30:

    step alignmentSize
    31~60 60 (1 hour)
    60~120 120 (2 hours)
    121~180 180 (3 hours)
    181~300 300 (5 hours)
    301~600 600 (10 hours)
    601~900 900 (15 hours)
    901~1200 1200 (20 hours)
    1201~1800 1800 (30 hours)
    >1800 3600 (60 hours)
  • If the data type of timeColumn is DATETIME (yyyy-MM-dd HH:mm:ss) or SECOND (HH:mm:ss), the value of alignmentSize is as follows:

    if roundTime=false:

    step alignmentSize
    0~2 2
    3 3
    4~5 5
    6~10 10
    11~15 15
    16~20 20
    21~30 30
    >30 60 (1minute)

    if roundTime=true:

    The value of alignmentSize is same as above table if step<=30; The value of alignmentSize is as folllows if step>30:

    step alignmentSize
    31~60 60 (1minute)
    61~120 120 (2minutes)
    121~180 180 (3minutes)
    181~300 300 (5minutes)
    301~600 600 (10minutes)
    601~900 900 (15minutes)
    901~1200 1200 (20minutes)
    1201~1800 1800 (30minutes)
    >1800 3600 (1hour)
  • If the data type of timeColumn is TIMESTAMP(yyyy-MM-dd HH:mm:ss.mmm) or TIME(HH:mm:ss.mmm), the value of alignmentSize is as follows:

    if roundTime=false:

    step alignmentSize
    0~2 2
    3~5 5
    6~10 10
    11~20 20
    21~25 25
    26~50 50
    51~100 100
    101~200 200
    201~250 250
    251~500 500
    501~1000 1000(1second)
    1001~2000 2000(2seconds)
    2001~5000 5000(5seconds)
    5001~10000 10000(10seconds)
    10001~15000 15000(15seconds)
    15001~20000 20000(20seconds)
    20001~30000 30000(30seconds)
    >30000 60000(1minutes)

    if roundTime=true:

    The value of alignmentSize is same as above table if step<=30000; The value of alignmentSize is as folllows if step>30000:

    step alignmentSize
    30001~60000 60000(1minute)
    60001~120000 120000(2minutes)
    120001~300000 300000(5minutes)
    300001~600000 600000(10minutes)
    600001~900000 900000(15minutes)
    900001~1200000 1200000(20minutes)
    1200001~1800000 1800000(30minutes)
    >1800000 3600000(1hour)
  • If the data type of timeColumn is NANOTIMESTAMP(yyyy-MM-dd HH:mm:ss.nnnnnnnnn) or NANOTIME(HH:mm:ss.nnnnnnnnn), the value of alignmentSize is as follows:

    if roundTime=false:

    step alignmentSize
    0~2ns 2ns
    3ns~5ns 5ns
    6ns~10ns 10ns
    11ns~20ns 20ns
    21ns~25ns 25ns
    26ns~50ns 50ns
    51ns~100ns 100ns
    101ns~200ns 200ns
    201ns~250ns 250ns
    251ns~500ns 500ns
    >500ns 1000ns

    if roundTime=true:

    step alignmentSize
    1000ns~1ms 1ms
    1ms~10ms 10ms
    10ms~100ms 100ms
    100ms~1s 1s
    1s~2s 2s
    2s~3s 3s
    3s~5s 5s
    5s~10s 10s
    10s~15s 15s
    15s~20s 20s
    20s~30s 30s
    >30s 1min

If the time of the first record is x with data type of TIMESTAMP, then the starting time of the first window is adjusted to be timeType_cast(x/alignmentSize*alignmentSize+step-windowSize), where "/" produces only the integer part after division. For example, if the time of the first record is 2018.10.08T01:01:01.365, windowSize = 120000, and step = 60000, then alignmentSize = 60000, and the starting time of the first window is timestamp(2018.10.08T01:01:01.365/60000*60000+60000-120000)=2018.10.08T01:01:00.000.

Examples

Example 1

In the following example, the time-series engine1 subscribes to the stream table "trades" and calculates sum(volume) for each stock in the last minute in real time. The result is saved in table output1.

share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
output1 = table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
engine1 = createTimeSeriesEngine(name="engine1", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50, useWindowStartTime=false)
subscribeTable(tableName="trades", actionName="engine1", offset=0, handler=append!{engine1}, msgAsTable=true);

insert into trades values(2018.10.08T01:01:01.785,`A,10)
insert into trades values(2018.10.08T01:01:02.125,`B,26)
insert into trades values(2018.10.08T01:01:10.263,`B,14)
insert into trades values(2018.10.08T01:01:12.457,`A,28)
insert into trades values(2018.10.08T01:02:10.789,`A,15)
insert into trades values(2018.10.08T01:02:12.005,`B,9)
insert into trades values(2018.10.08T01:02:30.021,`A,10)
insert into trades values(2018.10.08T01:04:02.236,`A,29)
insert into trades values(2018.10.08T01:04:04.412,`B,32)
insert into trades values(2018.10.08T01:04:05.152,`B,23)

sleep(10)

select * from output1;
time sym sumVolume
2018.10.08T01:02:00.000 A 38
2018.10.08T01:02:00.000 B 40
2018.10.08T01:03:00.000 A 25
2018.10.08T01:03:00.000 B 9

The following paragraphs explain in details how the time-series engine conducts the calculations. For simplicity, regarding the "time" column we ignore the part of "2018.10.08T" and only use the "hour:minute:second.millisecond" part.

First, the time-series engine adjusts the starting time of the first window to be 01:01:00.000. The first window is from 01:01:00.000 (inclusive) to 01:02:00.000 (exclusive). When the record (01:02:10.789,`A,15) arrives, it triggers the calculation of group A for the first window; the arrival of (01:02:12.005,`B,9) triggers the calculation of group B for the first window.

The second window is from 01:02:00.000 (inclusive) to 01:03:00.000 (exclusive). When the record (01:04:02.236,`A,29) arrives, it triggers the calculation of group A for the second window; the arrival of (01:04:04.412,`B,32) triggers the calculation of group B for the second window.

As there are no records since 01:05:00.000, no calculations are triggered for the window of [01:04:00.000, 01:05:00.000).

The table output1 stores the calculation results of the time-series engine. As useWindowStartTime=false, the timestamps in the output table are the end time of the windows. If useWindowStartTime=true, then the timestamps in the output table are the starting time of the windows, as illustrated in the following example:

output2 = table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
engine2 = createTimeSeriesEngine(name="engine2", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output2, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50, useWindowStartTime=true)
subscribeTable(tableName="trades", actionName="engine2", offset=0, handler=append!{engine2}, msgAsTable=true)

sleep(10)
select * from output2;
time sym sumVolume
2018.10.08T01:01:00.000 A 38
2018.10.08T01:01:00.000 B 40
2018.10.08T01:02:00.000 A 25
2018.10.08T01:02:00.000 B 9

Example 2

In the following example, we specify updateTime to be 1000 (milliseconds):

output3 = keyedTable(`time`sym,10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
engine3 = createTimeSeriesEngine(name="engine3", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output3, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50, updateTime=1000, useWindowStartTime=false)
subscribeTable(tableName="trades", actionName="engine3", offset=0, handler=append!{engine3}, msgAsTable=true)

sleep(2001)
select * from output3;
time sym sumVolume
2018.10.08T01:02:00.000 A 38
2018.10.08T01:02:00.000 B 40
2018.10.08T01:03:00.000 A 25
2018.10.08T01:03:00.000 B 9
2018.10.08T01:05:00.000 B 55
2018.10.08T01:05:00.000 A 29

Next, we will explain the calculations triggered in the last window from 01:04:00.000 to 01:05:00.000.

(1) At 01:04:04.236, 2000 milliseconds after the first record of group A arrived, a group A calculation is triggered. The result (01:05:00.000, `A, 29) is written to the output table.

(2) The record of group B at 01:04:05.152 is the first record after the small window of [01:04:04.000, 01:04:05.000) that contains the group B record at 01:04:04.412. It triggers a group B calculation. The result (01:05:00.000,"B",32) is written to the output table.

(3) 2000 milliseconds later, at 01:04:07.152, as the B group record at 1:04:05.152 has not been used in a calculation, a group B calculation is triggered. The result is (01:05:00.000,"B",55). As the output table's keys are columns 'time' and 'sym', the record of (01:05:00.000,`B,32) in the output table is updated and becomes (01:05:00.000,`B,55).

In the example, the shared stream table "pubT" contains two time columns with the type of DATE and SECOND. When creating time series engine, the two time columns could be combined into one column with the type of DATETIME in output table "streamMinuteBar_1min" by setting timeColumn.

colNames=`symbol`date`minute`price`type`volume
colTypes=`SYMBOL`DATE`SECOND`DOUBLE`STRING`INT
pubTable = streamTable(10000:0,colNames,colTypes)
share pubTable as pubT

colNames = `time`symbol`open`max`min`close`volume`amount`ret`vwap
colTypes = `DATETIME`SYMBOL`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`DOUBLE`DOUBLE`DOUBLE
streamMinuteBar_1min = streamTable(10000:0,colNames, colTypes)

tsAggrOHLC = createTimeSeriesEngine(name="subT", windowSize=60, step=60, metrics=<[first(price) as open ,max(price) as max,min(price) as min ,last(price) as close ,sum(volume) as volume ,wsum(volume, price) as amount ,(last(price)-first(price)/first(price)) as ret, (wsum(volume, price)/sum(volume)) as vwap]>, dummyTable=pubTable, outputTable=streamMinuteBar_1min, timeColumn=`date`minute, useSystemTime=false, keyColumn='symbol', fill=`none)
subscribeTable(tableName="pubT", actionName="subT", offset=-1, handler=append!{tsAggrOHLC}, msgAsTable=true)

insert into pubT values(`000001, 2021.04.05, 09:25:01, 1, 'B', 1)
insert into pubT values(`000001, 2021.04.05, 09:30:05, 2, 'B', 1)
insert into pubT values(`000001, 2021.04.05, 09:31:06, 3, 'B', 1)
insert into pubT values(`000001, 2021.04.05, 09:35:05, 4, 'S', 4)
insert into pubT values(`000001, 2021.04.05, 09:40:05, 5, 'S', 5)
insert into pubT values(`000001, 2021.04.06, 09:25:05, 6, 'S', 6)
symbol date minute price type volume
000001 2021.04.05 09:25:01 1 B 1
000001 2021.04.05 09:30:05 2 B 1
000001 2021.04.05 09:31:06 3 B 1
000001 2021.04.05 09:35:05 4 S 4
000001 2021.04.05 09:40:05 5 S 5
000001 2021.04.06 09:25:05 6 S 6
select * from streamMinuteBar_1min
time symbol open max min close volume amount ret vwap
2021.04.05T09:26:00 000001 1 1 1 1 1 1 0 1
2021.04.05T09:31:00 000001 2 2 2 2 1 2 1 2
2021.04.05T09:32:00 000001 3 3 3 3 1 3 2 3
2021.04.05T09:36:00 000001 4 4 4 4 4 16 3 4
2021.04.05T09:41:00 000001 5 5 5 5 5 25 4 5
share streamTable(1000:0, `time`sym`qty, [DATETIME, SYMBOL, INT]) as trades
output3 = table(10000:0, `time`sym`sumQty, [DATETIME, SYMBOL, INT])

engine = createTimeSeriesEngine(name="engine", windowSize=6, step=6, metrics=<sum(qty)>, dummyTable=trades, outputTable=output3, timeColumn=`time,keyColumn=`sym, forceTriggerTime=7,fill=1000)
subscribeTable(tableName="trades", actionName="engine", offset=0, handler=append!{engine}, msgAsTable=true)
sleep(1000)
insert into engine values(2018.08.01T14:05:43,`A,1)
insert into engine values(2018.08.01T14:05:43,`C,3)
sleep(10)
insert into engine values(2018.08.01T14:05:44,`B,1)
sleep(80)
insert into engine values(2018.08.01T14:05:52,`B,3)
sleep(20)
insert into engine values(2018.08.01T14:05:54,`A,3)
sleep(10)
insert into engine values(2018.08.01T14:05:55,`A,5)
sleep(20)
insert into engine values(2018.08.01T14:05:57,`B,5)
sleep(50)
insert into engine values(2018.08.01T14:06:12,`A,1)
sleep(50)
select * from output3 order by sym
time sum Qty
2018.08.01T14:05:46 A 1
2018.08.01T14:05:52 A 1,000
2018.08.01T14:05:58 A 8
2018.08.01T14:06:04 A 1,000
2018.08.01T14:06:10 A 1,000
2018.08.01T14:05:46 B 1
2018.08.01T14:05:52 B 1,000
2018.08.01T14:05:58 B 8
2018.08.01T14:05:46 C 3
2018.08.01T14:05:52 C 1,000

Example 3

The following example calculates the first/last volume in each window and merges the result in an array vector column.

// Define a function toVector with defg. The results are merged into array vectors
defg toVector(x){
    return x
}
share streamTable(1000:0, `time`sym`volume`price, [TIMESTAMP, SYMBOL, DOUBLE,DOUBLE]) as trades
// Define the output table and specify the result column as DOUBLE[] type
output1 = table(10000:0, `time`sym`sumVolume`avg, [TIMESTAMP,STRING,DOUBLE[],DOUBLE])
// Call toVector in metrics to combine the first and last volumes in a 1-minute window into an array vector
engine1 = createTimeSeriesEngine(name="engine1", windowSize=60000, step=60000, metrics=<[toVector([first(volume),last(volume)]),avg(volume+price)]>, dummyTable=trades, outputTable=output1, timeColumn=`time, keyColumn=`sym , useSystemTime=false, garbageSize=50, useWindowStartTime=false)

times = sort(2023.10.08T00:00:00.000 + rand(1..(1+3000*200), 30))
syms = rand("A"+string(1..10), 30)
volumes = rand(rand(100.0, 10) join 2.3 NULL NULL NULL NULL, 30)
prices = rand(rand(100.0, 10) join 2.3 NULL NULL NULL NULL, 30)
t=table(times as time, syms as sym, volumes as volume,prices as price)

engine1.append!(t)

select * from output1 where time between 2023.10.08T00:01:00.000 and 2023.10.08T00:05:00.000 order by time,sym 
time sym sumVolume avg
2023.10.08 00:01:00.000 A1 [, ]
2023.10.08 00:02:00.000 A4 [2.3, 2.3]
2023.10.08 00:03:00.000 A10 [68.5665876371786, 68.5665876371786] 121.97567140683532
2023.10.08 00:03:00.000 A6 [, 22.65533998142928] 33.386794407851994

Example 4

The following example specifies the parameter subWindow.


share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
output1 = table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
//specify a 10-second subwindow within a 1-min window. With closed unspecified, the subwindow takes the default value, i.e., left-closed, right-open window, [0s, 10s).
engine4 = createTimeSeriesEngine(name="engine4", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=output1, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50, useWindowStartTime=true, subWindow=0s:10s)

subscribeTable(tableName="trades", actionName="engine4", offset=0, handler=append!{engine4}, msgAsTable=true);

insert into trades values(2018.10.08T01:01:01.785,`A,10)
insert into trades values(2018.10.08T01:01:02.125,`B,26)
insert into trades values(2018.10.08T01:01:10.000,`A,14)
insert into trades values(2018.10.08T01:01:12.457,`A,28)

sleep(10)

select * from output1;

The calculation for group "A" within the subwindow [2018.10.08T01:01:00.000, 2018.10.08T01:01:10.000) is triggered by the arrival of data at 2018.10.08T01:01:10.000. No new data from group "B" is received after the subwindow ends, so the calculation of group "B" is not triggered.

time sym sumVolume

2018.10.08T01:01:10.000

A 10