streamEngineParser

Syntax

streamEngineParser(name, metrics, dummyTable, outputTable, keyColumn, [timeColumn], [useSystemTime=false], [snapshotDir], [snapshotIntervalInMsgCount], [raftGroup], [filter], [keepOrder], [keyPurgeFilter], [keyPurgeFreqInSecond], [triggeringPattern='perBatch'], [triggeringInterval=1000], [lastBatchOnly=false], [contextByColumn], [updateTime], [useWindowStartTime=false], [roundTime=true], [forceTriggerTime], [sessionBegin], [sessionEnd], [mergeSessionEnd=false], [closed='left'], [fill='none'], [garbageSize], [forceTriggerSessionEndTime], [keyPurgeFreqInSec=-1])

Details

The stream engine parser provides a unified interface for computing complex metrics over stream data. These metrics often require logic involving cross-sectional computation, stateful computation, windowing calculations over time series, table joins, and more. To handle such computational tasks, multiple streaming engines need to be combined into a pipeline. With the stream engine parser, you don't need to manually define and connect engines in a specific order. Instead, you only need to rewrite your metrics following a set of rules. The stream engine parser analyzes the metric expressions and automatically constructs a computational pipeline by combining the appropriate engines.

As the stream engine parser integrates DolphinDB's time-series, reactive state, and cross-sectional engines, it provides parameters for configuring each engine. Some parameters, like keyColumn, are shared across the engines. However, if you need to set different values for the shared parameters per engine, the stream engine parser cannot be used. In these cases, you'll need to manually build a cascade of engines.

streamEngineParser returns a table object. It is the table returned by the first streaming engine in the auto-generated pipeline. Appending data to this table means that data enters the engine pipeline for processing. Note that there is no function to delete the entire pipeline at once. The engines created by the stream engine parser must be deleted individually by name (see Arguments -> name) after use.

As mentioned earlier, the parser requires metrics to be written using specific functions or syntax rules. In this way, metrics can be translated into executable logic that is mapped to the appropriate streaming engines. For metrics with user-defined functions, the stream engine parser will analyze the function body logic.

When parsing a metric, the parser follows the following rules:
  1. It looks for function signifiers indicating the engine type
    • If a metric involves cross-sectional computation, the corresponding function must be a function signifier (often a function with the row- prefix). For example, to compute the sorted order of elements in each row, the rowRank function must be used to implement this layer of logic.

      Supported function signifiers for cross-sectional computation include:

      "byRow", "rowAvg", "rowCount", "count", "rowDenseRank",
      "rowMax", "rowMin", "rowProd", "rowRank", "rowSize", "rowSkew", "rowStd", "rowStdp", "rowSum", "rowSum2",
      "rowVar", "rowVarp", "rowAnd", "rowOr", "rowXor", "rowKurtosis", "rowCorr", "rowCovar",
      "rowBeta", "rowWsum", "rowWavg"

      For logic not expressible with row- functions or must be expressed with user-defined functions, you can use the byRow higher-order function.

    • If a metric involves windowing aggregation over time series, use the rolling higher-order function. For example, to calculate a rolling 3-element sum stepped by 3 elements on the "price" column using forward filling, specify rolling(sum,price,3,3,`ffill`).

      Syntax of rolling:
      rolling(func, funcArgs, window, [step=1], [fill='none'])
    • Built-in functions without the above function signifiers will be processed by the reactive state engine.

  2. Input table schema for intermediate engines

    streamEngineParser allows you to specify the schema for only the input table of the first parsed engine through the dummyTable parameter. The intermediate processing steps are abstracted away from the user. The stream engine parser uses specific naming conventions for the dummyTables of intermediate engines. If you need to reference a column from an intermediate engine's dummyTable in metrics or other parameters, follow these naming conventions:

    • The first column of any intermediate dummyTable is the time column.
    • The next columns are the ones specified by keyColumn.
    • After that come columns holding the calculation results of metrics. These metric columns are named "col_0_, col_1_, col_2_,..." and so on.
Note:
  1. The output tables for time-series engines and cross-sectional engines both contain a time column. However, reactive state engines do not output a time column. When the engine pipeline generates a reactive state engine, it will automatically add a time column to that engine's metrics. This enables the time column to be included in the dummyTable passed to the next engine in the pipeline. In this scenario, if useSystemTime is true, the auto-added time column is named "datetime".
  2. Certain parameters for cross-sectional engines and reactive state engines may need column names from the intermediate dummyTable. For example:
    • contextByColumn of cross-sectional engines
    • filter and keyPurgeFilter of reactive state engines

    To specify columns from an intermediate dummyTable, follow the aforementioned naming conventions, e.g., contextByColumn = col_0_.

    To specify the time column or a key column, simply use the column name.

Required Arguments

name is a string specifying the prefix for the names of the engines in the parser. It can contain letters, numbers, and underscores, but must start with a letter. For example, if name = "test", the engine names will be "test0", "test1", "test2", etc. The number appended to each name indicates the engine's position within the parser, which is determined by the parsing results of metrics.

metrics is metacode indicating the metrics to be computed by the parser. Tuples are supported. For more information, see Functional Metaprogramming.
  • metrics can include built-in or user-defined functions, like <[sum(qty), avg(price)]>; expressions, like <[avg(price1)-avg(price2)]>; computation on columns, like <[std(price1-price2)]>.

  • Functions that return multiple values are also supported, such as <func(price) as col1 col2> (column names are optional).

The metrics are typically complex, with multiple layers of computation logic. The parser analyzes each layer of logic and assigns it to the appropriate streaming engine (see Details).

dummyTable is a table object. It must have the same schema as the stream table to which the engine subscribes. Whether it contains data or not doesn't matter.

outputTable holds the computation results from the engine pipeline. It can be an in-memory table or a DFS table, but must be empty before calling streamEngineParser. The column names and data types in outputTable must be specified upfront. The schema of outputTable depends on the last engine in the pipeline:
  • For a time-series engine, the columns are ordered as:

    1. Time column

      1. The column type is either TIMESTAMP (when useSystemTime = true) or same type as timeColumn (when useSystemTime = false).

      2. It displays window start (when useWindowStartTime = true) or end times (when useWindowStartTime = false).

    2. Key column(s), if keyColumn is specified. They are ordered the same as in keyColumn.

    3. Metric computation result column(s).

  • For a reactive state engine, the columns are ordered as:

    1. Key column(s), ordered the same as in keyColumn.

    2. Metric computation result column(s).

  • For a cross-sectional engine:

    • If contextByColumn is not specified, the columns of the output table are ordered as:

      1. A TIMESTAMP column with system timestamps for each computation (or the corresponding values from the timeColumn, if this parameter is specified).

      2. Metric computation result column(s). The data types must be consistent with the metric computation results.

    • If contextByColumn is specified, the columns of the output table are ordered as:

      1. A TIMESTAMP column with system timestamps for each computation (or the corresponding values from the timeColumn, if this parameter is specified).

      2. The column specified in contextByColumn.

      3. Metric computation result column(s). The data types must be consistent with the metric computation results.

keyColumn is a string scalar or vector.
  • For time-series engines and reactive state engines, keyColumn indicates the columns to group data by. If specified, computations are done per group, such as aggregations by stock.

  • For cross-sectional engines, keyColumn indicates the column holding the unique keys. The engine will use only the latest record per key for each computation.

Optional Arguments

timeColumn specifies the time column in the stream table to which the engine subscribes. It is required when useSystemTime = false.
  • For time-series engines and reactive state engines, timeColumn is can be a string scalar or a 2-element string vector. The vector specifies a DATE and a TIME/SECOND/NANOTIME column. The engine will concatenate the date and time values into a single time value for the output table. See concatDateTime() for details on the concatenated data type.

  • For cross-sectional engines, timeColumn is a string. The specified column must be of TIMESTAMP type.

useSystemTime indicates how the engine handles time when processing data.
  • For cross-sectional engines, useSystemTime is an optional boolean parameter. If true (default), the engine will use the system time when each calculation starts and output it in the first column of the output table. If false, it will use the time values from the specified timeColumn instead.

  • For time-series engines, useSystemTime is an optional parameter specifying how data is windowed for processing:

    • When useSystemTime = true, the engine will regularly window the streaming data at fixed time intervals for calculations according to the ingestion time (local system time with millisecond precision, independent of any time columns in the stream table) of each record. As long as a window contains data, the calculation will be performed automatically when the window ends. The first column in output table indicates the timestamps when the calculation occurred. Note: When useSystemTime = true, timeColumn must be left empty.

    • useSystemTime = false (default): the engine will window the streaming data according to the timeColumn in the stream table. The calculation for a window is triggered by the first record after the previous window. Please note that the record which triggers the calculation will not participate in this calculation.

To enable snapshot in the streaming engines, specify parameters snapshotDir and snapshotIntervalInMsgCount.

snapshotDir is a string indicating the directory where the streaming engine snapshot is saved.
  • The directory must already exist, otherwise an exception is thrown.

  • If snapshotDir is specified, the system checks whether a snapshot already exists in the directory when creating a streaming engine. If it exists, the snapshot will be loaded to restore the engine state.

  • Multiple streaming engines can share a directory where the snapshot files are named as the engine names.

  • The file extension of a snapshot can be:

    • <engineName>.tmp: temporary snapshot

    • <engineName>.snapshot: a snapshot that is generated and flushed to disk

    • <engineName>.old: if a snapshot with the same name already exists, the previous snapshot is renamed to <engineName>.old.

snapshotIntervalInMsgCount is a positive integer indicating the number of messages to receive before the next snapshot is saved.

raftGroup is an integer greater than 1, indicating ID of the raft group on the high-availability streaming subscriber specified by the configuration parameter streamingRaftGroups. Specify raftGroup to enable high availability on the streaming engine. When an engine is created on the leader, it is also created on each follower and the engine snapshot is synchronized to the followers. When a leader is down, the raft group automatically switches to a new leader to resubscribe to the stream table. Note that SnapShotDir must also be specified when specifying a raft group.

The stream engine parser must be created on the leader of a raft group.

Below are the parameters unique to a specific type of streaming engine:
  • reactive state engines (createReactiveStateEngine): filter, keepOrder, keyPurgeFilter, keyPurgeFreqInSecond

  • cross-sectional engines (createCrossSectionalEngine): triggeringPattern, triggeringInterval, lastBatchOnly, contextByColumn

  • time-series engines (createTimeSeriesEngine): updateTime, useWindowStartTime, roundTime, forceTriggerTime, closed, fill, garbageSize, keyPurgeFreqInSec

  • daily time-series engines (createDailyTimeSeriesEngine): sessionBegin, sessionEnd, mergeSessionEnd, closed, fill, garbageSize, forceTriggerSessionEndTime, keyPurgeFreqInSec

Note: If triggeringPattern='keyCount', parameter keepOrder must be set to true.

Examples

This example implements Formula #1 from World Quant's 101 Alpha Factors on streaming data. The rank function performs a cross-sectional operation and is processed with a cross-sectional engine. rank's parameters are computed using a reactive state engine which outputs to the cross-sectional engine.
n = 100
sym = rand(`aaa`bbb`ccc, n)
time = 2021.01.01T13:30:10.008 + 1..n
maxIndex=rand(100.0, n)
data = table(sym as sym, time as time, maxIndex as maxIndex)
Alpha#001formula: rank(Ts_ArgMax(SignedPower((returns<0?stddev(returns,20):close), 2), 5))-0.5

//create a cross-sectional engine to calculate the rank of each stock
dummy = table(1:0, `sym`time`maxIndex, [SYMBOL, TIMESTAMP, DOUBLE])
resultTable = streamTable(10000:0, `time`sym`factor1, [TIMESTAMP, SYMBOL, DOUBLE])
ccsRank = createCrossSectionalAggregator(name="alpha1CCS", metrics=<[sym, rank(maxIndex, percent=true) - 0.5]>,  dummyTable=dummy, outputTable=resultTable,  keyColumn=`sym, triggeringPattern='keyCount', triggeringInterval=3000, timeColumn=`time, useSystemTime=false)

@state
def wqAlpha1TS(close){
    ret = ratios(close) - 1
    v = iif(ret < 0, mstd(ret, 20), close)
    return mimax(signum(v)*v*v, 5)
}

//create a reactive state engine which outputs to the cross-sectional engine
input = table(1:0, `sym`time`close, [SYMBOL, TIMESTAMP, DOUBLE])
rse = createReactiveStateEngine(name="alpha1", metrics=<[time, wqAlpha1TS(close)]>, dummyTable=input, outputTable=ccsRank, keyColumn="sym")
rse.append!(data)

dropStreamEngine("alpha1CCS")
dropStreamEngine("alpha1")
The computation can also be done by using the streamEngineParser:
input = table(1:0, `sym`time`close, [SYMBOL, TIMESTAMP, DOUBLE])
resultTable = streamTable(10000:0, `time`sym`factor1, [TIMESTAMP, SYMBOL, DOUBLE])

//construc metrics
metrics=<[sym, rowRank(wqAlpha1TS(close), percent=true)- 0.5]>

streamEngine=streamEngineParser(name=`alpha1_parser, metrics=metrics, dummyTable=input, outputTable=resultTable, keyColumn=`sym, timeColumn=`time, triggeringPattern='keyCount', triggeringInterval=3000)
streamEngine.append!(data)

dropStreamEngine("alpha1_parser0")
dropStreamEngine("alpha1_parser1")