createCrossSectionalEngine
Syntax
createCrossSectionalEngine(name, [metrics], dummyTable, [outputTable],
keyColumn, [triggeringPattern='perBatch'], [triggeringInterval=1000],
[useSystemTime=true], [timeColumn], [lastBatchOnly=false], [contextByColumn],
[snapshotDir], [snapshotIntervalInMsgCount], [raftGroup],
[outputElapsedMicroseconds=false], [roundTime=true], [keyFilter])
Alias: createCrossSectionalAggregator
Details
This function creates a cross-sectional streaming engine and returns a keyed table with keyColumn as the key.
The keyed table is updated every time a new record arrives. If the parameter lastBatchOnly is set to true, the table only maintains the latest record in each group. When new data is ingested into the engine,
-
if metrics and outputTable are specified, the engine first updates the keyed table, then performs calculations on the latest data and outputs the results to outputTable.
-
if metrics and outputTable are not specified, the engine only updates the keyed table.
For more application scenarios, see Streaming Engines.
Calculation Rules
The calculation can be triggered by the number of records or time interval. See parameters triggeringPattern and triggeringInterval. Note that if contextByColumn is specified, the data will be grouped by the specified columns and calculated by group.
Features
-
Snapshot: Snapshot mechanism is used to restore the streaming engine to the latest snapshot after system interruption. (See parameters snapshotDir and snapshotIntervalInMsgCount)
-
High availability: To enable high availability for streaming engines, specify the parameter raftGroup on the leader of the raft group on the subscriber. When a leader is down, the raft group automatically switches to a new leader to resubscribe to the stream table.
Arguments
name is a string of the engine name. It is the only identifier of a cross sectional engine on a data/compute node. It can have letter, number and "_" and must start with a letter.
metrics (optional) is metacode or a tuple specifying the formulas for calculation. It can be:
-
Built-in or user-defined aggregate functions, e.g.,
<[sum(qty), avg(price)]>
; Or expressions on previous results, e.g.,<[avg(price1)-avg(price2)]>
; Or calculation on multiple columns, e.g.,<[std(price1-price2)]>
-
Functions with multiple returns, such as
<func(price) as `col1`col2>
. The column names can be specified or not. For more information about metacode, see Metaprogramming. -
The column names specified in metrics are not case-sensitive and can be inconsistent with the column names of the input tables.
dummyTable is a table object whose schema must be the same as the stream table to which the engine subscribes. Whether dummyTable contains data does not matter.
outputTable (optional) is the output table for the results. It can be an in-memory table or a DFS table. Create an empty table and specify the column names and types before calling the function. Make sure that data types of columns storing calculation results are the same as the results of metrics. The output columns are in the following order:
(1) The first column is of TIMESTAMP type.
-
If useSystemTime = true, the column stores the time when each calculation starts;
-
If useSystemTime = false, it takes the values of timeColumn.
(2) The following column is the contextByColumn (if specified).
(3) If the outputElapsedMicroseconds is set to true, specify two more columns: a LONG column and an INT column.
(4) The remaining columns store the calculation results of metrics.
keyColumn is a STRING scalar or vector that specifies one or more columns in the stream table as the key columns. For each unique value in the keyColumn, only the latest record is used in the calculation.
triggeringPattern (optional) is a STRING scalar specifying how to trigger the calculations. The engine returns a result every time a calculation is triggered. It can take one of the following values:
-
'perBatch' (default): calculates when a batch of data arrives.
-
'perRow': calculates when a new record arrives.
-
'interval': calculates at intervals specified by triggeringInterval, using system time.
-
'interval': calculates at the intervals of triggeringInterval (using system time).
-
'keyCount': When data with the same timestamp arrives in batches, the calculation is triggered when:
-
if the number of keys with the latest timestamp reaches triggeringInterval;
-
or data with newer timestamp arrives.
-
-
'dataInterval': calculates at intervals based on timestamps in the data. To use this, timeColumn must be specified and useSystemTime must be false.
Note: To set triggeringPattern as 'keyCount', timeColumn must be specified and useSystemTime must be set to false. In this case, the out-of-order data will be discarded.
-
If triggeringPattern = 'interval', triggeringInterval is a positive integer indicating the interval in milliseconds between 2 adjacent calculations. The default value is 1,000. Every triggeringInterval milliseconds, the system checks if the data in the engine has been calculated; if not, a calculation is triggered.
-
If triggeringPattern = 'keyCount', triggeringInterval can be:
-
an integer specifying a threshold. Before data with a greater timestamp arrives, a calculation is triggered when the number of uncalculated records reaches the threshold.
-
a tuple of 2 elements. The first element is an integer indicating the threshold of the number records with the latest timestamp to trigger calculation. The second element is an integer or duration value.
For example, when triggeringInterval is set to (c1, c2):
-
If c2 is an integer and the number of keys with the latest timestamp t1 doesn't reach c1, calculation will not be triggered and the system goes on to save data with greater timestamp t2 (t2>t1). Data with t1 will be calculated when either of the events happens: the number of keys with timestamp t2 reaches c2, or data with greater timestamp t3 (t3>t2) arrives. Note that c2 must be smaller than c1.
-
If c2 is a duration and the number of keys with the latest timestamp t1 doesn't reach c1, calculation will not be triggered and the system goes on to save data with greater timestamp t2 (t2>t1) . Once data with t2 starts to come in, data with t1 will not be calculated until any of the events happens: the number of keys with timestamp t1 reaches c1, or data with greater timestamp t3 (t3>t2) arrives, or the duration c2 comes to an end.
-
-
If triggeringPattern= 'dataInterval', triggeringInterval is a positive integer measured in the same units as the timestamps in timeColumn. The default is 1,000. Starting with the first record, a window is started at intervals defined by triggeringInterval, based on the timestamp units. When the first record of the next window arrives, a calculation is triggered on all data in the current window.
-
In versions 1.30.23.2 and earlier, a calculation is triggered for each window.
-
Since version 1.30.23.3, a calculation is triggered only for windows containing data.
-
useSystemTime (optional) is a Boolean value indicating whether the calculations are performed based on the system time when data is ingested into the engine.
-
If useSystemTime = true, the time column of outputTable is the system time;
-
If useSystemTime = false, the parameter timeColumn must be specified. The time column of outputTable uses the timestamp of each record.
timeColumn (optional) is a STRING scalar which specifies the time column in the stream table to which the engine subscribes if useSystemTime = false. It can only be of TIMESTAMP type.
lastBatchOnly (optional) is a Boolean value indicating whether to keep only the records with the latest timestamp in the engine. When lastBatchOnly = true, triggeringPattern must take the value 'keyCount', and the cross-sectional engine only maintains key values with the latest timestamp for calculation. Otherwise, the engine updates and retains all values for calculation.
contextByColumn (optional) is a STRING scalar or vector indicating the
grouping column(s) based on which calculations are performed by group. This
parameter only takes effect if metrics and outputTable are specified.
If metrics only contains aggregate functions, the calculation results would
be the same as a SQL query using group by
. Otherwise, the results
would be consistent with that using context by
.
To enable snapshot in the streaming engines, specify parameters snapshotDir and snapshotIntervalInMsgCount.
snapshotDir (optional) is a string indicating the directory where the streaming engine snapshot is saved. The directory must already exist, otherwise an exception is thrown. If snapshotDir is specified, the system checks whether a snapshot already exists in the directory when creating a streaming engine. If it exists, the snapshot will be loaded to restore the engine state. Multiple streaming engines can share a directory where the snapshot files are named as the engine names.
The file extension of a snapshot can be:
- <engineName>.tmp: temporary snapshot
- <engineName>.snapshot: a snapshot that is generated and flushed to disk
- <engineName>.old: if a snapshot with the same name already exists, the previous snapshot is renamed to <engineName>.old.
snapshotIntervalInMsgCount (optional) is a positive integer indicating the number of messages to receive before the next snapshot is saved.
raftGroup (optional) is an integer greater than 1, indicating ID of the raft group on the high-availability streaming subscriber specified by the configuration parameter streamingRaftGroups. Specify raftGroup to enable high availability for the streaming engines. When an engine is created on the leader, it is also created on each follower and the engine snapshot is synchronized to the followers. When a leader is down, the raft group automatically switches to a new leader to resubscribe to the stream table. Note that SnapShotDir must also be specified when specifying a raft group.
outputElapsedMicroseconds (optional) is a Boolean value. The default value is false. It determines whether to output:
-
the elapsed time (in microseconds) from the ingestion of data to the output of result in each batch.
-
the total row number of each batch.
Note: When both outputElapsedMicroseconds and useSystemTime parameters are set to true, aggregate function cannot be used in metrics.
roundTime (optional) is a Boolean value indicating the method to align the window boundary when triggeringPattern='dataInterval'. The default value is true indicating the alignment is based on the multi-minute rule (see the alignment rules of time-series engine). False means alignment is based on the one-minute rule.
keyFilter (optional) is metacode of an expression or function call that returns a Boolean vector. It specifies the conditions for filtering keys in the keyed table returned by the engine. Only data with keys satisfying the filtering conditions will be taken for calculation.
Examples
Example 1. A table "csEngine1" is created with function
createCrossSectionalEngine
and it subscribes to the stream
table trades1. We set triggeringPattern to 'perRow', so each row that is
inserted into table csEngine1 triggers a calculation.
share streamTable(10:0,`time`sym`price`volume,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades1
share table(1:0, `time`avgPrice`volume`dollarVolume`count, [TIMESTAMP,DOUBLE,INT,DOUBLE,INT]) as outputTable
csEngine1=createCrossSectionalEngine(name="csEngineDemo1", metrics=<[avg(price), sum(volume), sum(price*volume), count(price)]>, dummyTable=trades1, outputTable=outputTable, keyColumn=`sym, triggeringPattern="perRow", useSystemTime=false, timeColumn=`time)
subscribeTable(tableName="trades1", actionName="tradesStats", offset=-1, handler=append!{csEngine1}, msgAsTable=true)
insert into trades1 values(2020.08.12T09:30:00.000 + 123 234 456 678 890 901, `A`B`A`B`B`A, 10 20 10.1 20.1 20.2 10.2, 20 10 20 30 40 20);
select * from trades1;
time | sym | price | volume |
---|---|---|---|
2020.08.12T09:30:00.123 | A | 10 | 20 |
2020.08.12T09:30:00.234 | B | 20 | 10 |
2020.08.12T09:30:00.456 | A | 10.1 | 20 |
2020.08.12T09:30:00.678 | B | 20.1 | 30 |
2020.08.12T09:30:00.890 | B | 20.2 | 40 |
2020.08.12T09:30:00.901 | A | 10.2 | 20 |
select * from outputTable;
time | avgPrice | volume | dollarVolume | count |
---|---|---|---|---|
2020.08.12T09:30:00.123 | 10 | 20 | 200 | 1 |
2020.08.12T09:30:00.234 | 15 | 30 | 400 | 2 |
2020.08.12T09:30:00.456 | 15.05 | 30 | 402 | 2 |
2020.08.12T09:30:00.678 | 15.1 | 50 | 805 | 2 |
2020.08.12T09:30:00.890 | 15.15 | 60 | 1010 | 2 |
2020.08.12T09:30:00.901 | 15.2 | 60 | 1012 | 2 |
Example 2. WhentriggeringPattern is set to 'perBatch', insert 2 batches of data.
share streamTable(10:0,`time`sym`price`volume,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades2
share table(1:0, `time`avgPrice`volume`dollarVolume`count, [TIMESTAMP,DOUBLE,INT,DOUBLE,INT]) as outputTable
csEngine2=createCrossSectionalEngine(name="csEngineDemo2", metrics=<[avg(price), sum(volume), sum(price*volume), count(price)]>, dummyTable=trades2, outputTable=outputTable, keyColumn=`sym, triggeringPattern="perBatch", useSystemTime=false, timeColumn=`time)
subscribeTable(tableName="trades2", actionName="tradesStats", offset=-1, handler=append!{csEngine2}, msgAsTable=true)
insert into trades2 values(2020.08.12T09:30:00.000 + 123 234 456, `A`B`A, 10 20 10.1, 20 10 20);
sleep(1)
insert into trades2 values(2020.08.12T09:30:00.000 + 678 890 901, `B`B`A, 20.1 20.2 10.2, 30 40 20);
select * from outputTable;
time | avgPrice | volume | dollarVolume | count |
---|---|---|---|---|
2020.08.12T09:30:00.456 | 15.05 | 30 | 402 | 2 |
2020.08.12T09:30:00.901 | 15.2 | 60 | 1012 | 2 |
Example 3. The following example sets triggeringPattern to 'keyCount' and lastBatchOnly to true. Only the data with the latest timestamp will participate in calculation. Since there are both aggregate and non-aggregate functions set in metrics, the number of rows in the result table will be the same as that in the input table.
share streamTable(10:0,`time`sym`price`volume,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades1
share table(1:0, `time`factor1`factor2, [TIMESTAMP, DOUBLE,INT]) as outputTable
agg=createCrossSectionalEngine(name="csEngineDemo4", metrics=<[price+ 0.1, sum(volume)]>, dummyTable=trades1, outputTable=outputTable, keyColumn=`sym, triggeringPattern="keyCount", triggeringInterval=5, useSystemTime=false, timeColumn=`time,lastBatchOnly=true)
subscribeTable(tableName=`trades1, actionName="csEngineDemo4", msgAsTable=true, handler=append!{agg})
num=10
time=array(TIMESTAMP)
time=take(2018.01.01T09:30:00.000,num)
sym=take("A"+string(1..10),num)
price=1..num
volume=1..num
tmp=table(time, sym, price, volume)
trades1.append!(tmp)
// Only the latest 5 records will participate in calculation.
num=5
time = array(TIMESTAMP)
time=take(2018.01.01T09:30:01.000,num)
sym=take("A"+string(1..10),num)
price=6..10
volume=6..10
tmp=table(time, sym, price, volume)
trades1.append!(tmp)
time | factor1 | factor2 |
---|---|---|
2018.01.01T09:30:00.000 | 1.1 | 55 |
2018.01.01T09:30:00.000 | 2.1 | 55 |
2018.01.01T09:30:00.000 | 3.1 | 55 |
2018.01.01T09:30:00.000 | 4.1 | 55 |
2018.01.01T09:30:00.000 | 5.1 | 55 |
2018.01.01T09:30:00.000 | 6.1 | 55 |
2018.01.01T09:30:00.000 | 7.1 | 55 |
2018.01.01T09:30:00.000 | 8.1 | 55 |
2018.01.01T09:30:00.000 | 9.1 | 55 |
2018.01.01T09:30:00.000 | 10.1 | 55 |
2018.01.01T09:30:01.000 | 6.1 | 40 |
2018.01.01T09:30:01.000 | 7.1 | 40 |
2018.01.01T09:30:01.000 | 8.1 | 40 |
2018.01.01T09:30:01.000 | 9.1 | 40 |
2018.01.01T09:30:01.000 | 10.1 | 40 |
Example 4. Set triggeringPattern to 'interval' and triggeringInterval to 500 (milliseconds).
share streamTable(10:0,`time`sym`price`volume,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades3
share table(1:0, `time`avgPrice`volume`dollarVolume`count, [TIMESTAMP,DOUBLE,INT,DOUBLE,INT]) as outputTable
csEngine3=createCrossSectionalEngine(name="csEngineDemo3", metrics=<[avg(price), sum(volume), sum(price*volume), count(price)]>, dummyTable=trades3, outputTable=outputTable, keyColumn=`sym, triggeringPattern="interval", triggeringInterval=500)
subscribeTable(tableName="trades3", actionName="tradesStats", offset=-1, handler=append!{csEngine3}, msgAsTable=true);
insert into trades3 values(2020.08.12T09:30:00.000, `A, 10, 20)
insert into trades3 values(2020.08.12T09:30:00.000 + 500, `B, 20, 10)
insert into trades3 values(2020.08.12T09:30:00.000 + 1000, `A, 10.1, 20)
insert into trades3 values(2020.08.12T09:30:00.000 + 2000, `B, 20.1, 30)
sleep(500)
insert into trades3 values(2020.08.12T09:30:00.000 + 2500, `B, 20.2, 40)
insert into trades3 values(2020.08.12T09:30:00.000 + 3000, `A, 10.2, 20);
sleep(500)
select * from outputTable;
time | avgPrice | volume | dollarVolume | count |
---|---|---|---|---|
2022.03.02T11:17:02.341 | 15.1 | 50 | 805 | 2 |
2022.03.02T11:17:02.850 | 15.2 | 60 | 1,012 | 2 |
The calculation is triggered every 500 ms based on the system time. Only the records with the latest timestamp participate in the calculation, even if there are multiple uncalculated records with the same key.
In the above example, the table returned by cross sectional engine is usually an intermediate result for the calculation. But it can also be a final result. For example, if you need to regularly refresh the latest trading price of a certain stock, the basic way is to filter the stocks by code from the real-time trading table and retrieve the last record. However, the amount of data in the trading table is growing rapidly over time. For frequent queries, it is not the best practice in terms of system resource consumption or query performance. The cross sectional table only saves the latest transaction data of all stocks, the data amount is stable. So it is very suitable for the timing polling scenario.
To use a cross sectional table as a final result, you need to set metrics and outputTable to be empty.
tradesCrossEngine=createCrossSectionalEngine(name="CrossSectionalDemo", dummyTable=trades, keyColumn=`sym, triggeringPattern=`perRow)