DStream::crossSectionalEngine

Syntax

DStream::crossSectionalEngine(metrics, keyColumn, [triggeringPattern='perBatch'], [triggeringInterval=1000], [useSystemTime=true], [timeColumn], [lastBatchOnly=false], [contextByColumn], [roundTime=true], [keyFilter])

Details

Creates a cross-sectional streaming engine. For details, see createCrossSectionalEngine.

Return value: A DStream object.

Arguments

metrics (optional) is metacode or a tuple specifying the formulas for calculation. It can be:
  • Built-in or user-defined aggregate functions, e.g., <[sum(qty), avg(price)]>; Or expressions on previous results, e.g., <[avg(price1)-avg(price2)]>; Or calculation on multiple columns, e.g., <[std(price1-price2)]>

  • Functions with multiple returns, such as <func(price) as `col1`col2>. The column names can be specified or not. For more information about metacode, see Metaprogramming.

  • The column names specified in metrics are not case-sensitive and can be inconsistent with the column names of the input tables.

keyColumn is a STRING scalar or vector that specifies one or more columns in the stream table as the key columns. For each unique value in the keyColumn, only the latest record is used in the calculation.

triggeringPattern (optional) is a STRING scalar specifying how to trigger the calculations. The engine returns a result every time a calculation is triggered. It can take one of the following values:
  • 'perBatch' (default): calculates when a batch of data arrives.

  • 'perRow': calculates when a new record arrives.

  • 'interval': calculates at intervals specified by triggeringInterval, using system time.

  • 'keyCount': When data with the same timestamp arrives in batches, the calculation is triggered when:

    • if the number of keys with the latest timestamp reaches triggeringInterval;

    • or data with newer timestamp arrives.

  • 'dataInterval': calculates at intervals based on timestamps in the data. To use this, timeColumn must be specified and useSystemTime must be false.

Note: To set triggeringPattern as 'keyCount', timeColumn must be specified and useSystemTime must be set to false. In this case, the out-of-order data will be discarded.
triggeringInterval (optional) can be an integer or a tuple. Below explains its optional values and triggering rules:
  • If triggeringPattern = 'interval', triggeringInterval is a positive integer indicating the interval in milliseconds between 2 adjacent calculations. The default value is 1,000. Every triggeringInterval milliseconds, the system checks if the data in the engine has been calculated; if not, a calculation is triggered.

  • If triggeringPattern = 'keyCount', triggeringInterval can be:

    • an integer specifying a threshold. Before data with a greater timestamp arrives, a calculation is triggered when the number of uncalculated records reaches the threshold.

    • a tuple of 2 elements. The first element is an integer indicating the threshold of the number records with the latest timestamp to trigger calculation. The second element is an integer or duration value.

    For example, when triggeringInterval is set to (c1, c2):

    • If c2 is an integer and the number of keys with the latest timestamp t1 doesn't reach c1, calculation will not be triggered and the system goes on to save data with greater timestamp t2 (t2>t1). Data with t1 will be calculated when either of the events happens: the number of keys with timestamp t2 reaches c2, or data with greater timestamp t3 (t3>t2) arrives. Note that c2 must be smaller than c1.

    • If c2 is a duration and the number of keys with the latest timestamp t1 doesn't reach c1, calculation will not be triggered and the system goes on to save data with greater timestamp t2 (t2>t1) . Once data with t2 starts to come in, data with t1 will not be calculated until any of the events happens: the number of keys with timestamp t1 reaches c1, or data with greater timestamp t3 (t3>t2) arrives, or the duration c2 comes to an end.

  • If triggeringPattern= 'dataInterval', triggeringInterval is a positive integer measured in the same units as the timestamps in timeColumn. The default is 1,000. Starting with the first record, a window is started at intervals defined by triggeringInterval, based on the timestamp units. When the first record of the next window arrives, a calculation is triggered on all data in the current window.

    • In versions 2.00.11.2 and earlier, a calculation is triggered for each window.

    • Since version 2.00.11.3, a calculation is triggered only for windows containing data.

useSystemTime (optional) is a Boolean value indicating whether the calculations are performed based on the system time when data is ingested into the engine.
  • If useSystemTime = true, the time column of outputTable is the system time;

  • If useSystemTime = false, the parameter timeColumn must be specified. The time column of outputTable uses the timestamp of each record.

timeColumn (optional) is a STRING scalar which specifies the time column in the stream table to which the engine subscribes if useSystemTime = false. It can only be of TIMESTAMP type.

lastBatchOnly (optional) is a Boolean value indicating whether to keep only the records with the latest timestamp in the engine. When lastBatchOnly = true, triggeringPattern must take the value 'keyCount', and the cross-sectional engine only maintains key values with the latest timestamp for calculation. Otherwise, the engine updates and retains all values for calculation.

contextByColumn (optional) is a STRING scalar or vector indicating the grouping column(s) based on which calculations are performed by group. This parameter only takes effect if metrics and outputTable are specified. If metrics only contains aggregate functions, the calculation results would be the same as a SQL query using group by. Otherwise, the results would be consistent with that using context by.

roundTime (optional) is a Boolean value indicating the method to align the window boundary when triggeringPattern='dataInterval'. The default value is true indicating the alignment is based on the multi-minute rule (see the alignment rules of time-series engine). False means alignment is based on the one-minute rule.

keyFilter (optional) is metacode of an expression or function call that returns a Boolean vector. It specifies the conditions for filtering keys in the keyed table returned by the engine. Only data with keys satisfying the filtering conditions will be taken for calculation.

Examples

if (!existsCatalog("orca")) {
	createCatalog("orca")
}
go
use catalog orca

// If a stream graph with the same name already exists, destroy it first.
// dropStreamGraph('cs')

csGraph = createStreamGraph("cs")
csGraph.source("trade1", 1000:0, `time`sym`price`volume,[TIMESTAMP,SYMBOL,DOUBLE,INT])
  .crossSectionalEngine(metrics=<[avg(price), sum(volume), sum(price*volume), count(price)]>, keyColumn=`sym)
  .sink("cs_output")
csGraph.submit()
go

times=2020.08.12T09:30:00.000 + 123 234 456 678 890 901
syms=`A`B`A`B`B`A
prices=10 20 10.1 20.1 20.2 10.2
volumes=20 10 20 30 40 20
tmp=table(times as time, syms as sym, prices as price, volumes as volume)

appendOrcaStreamTable("trade1", tmp)

select * from orca_table.cs_output;