CrossSectionalEngine#

class swordfish._swordfishcpp.CrossSectionalEngine#

The cross-sectional streaming engine is used for real-time computing on cross-sectional data, which is a collection of observations (behaviors) for multiple subjects (entities such as different stocks) at a single point in time.

CrossSectionalEngine.create returns a Builder object, and then call the submit to create a keyed table object with the key_col parameter as the key. The keyed table is updated every time a new record arrives. If the last_batch_only parameter is set to True, the table only maintains the latest record in each group. When new data is ingested into the engine:

  • If metrics and output are specified, the engine first updates the keyed table, then performs calculations on the latest data and outputs the results to output.

  • If metrics and output are not specified, the engine only updates the keyed table.

Calculation can be triggered by the number of records or time interval. See create parameters triggering_pattern and triggering_interval. Note that if context_by_col is specified, the data will be grouped by the specified columns and calculated by group.

Snapshot mechanism is used to restore the streaming engine to the latest snapshot after system interruption. (See create parameters snapshot_dir and snapshot_interval_in_msg_count)

engine_type: EngineType#

The type of the streaming engine.

stat: CrossSectionalEngineStat#

Descriptive statistics related to the streaming engine.

classmethod create(name, table_schema, key_col, *, metrics=None, output=None, triggering_pattern='per_batch', triggering_interval=None, use_system_time=True, time_col=None, last_batch_only=False, context_by_col=None, snapshot_dir=None, snapshot_interval_in_msg_count=None, output_elapsed_microseconds=False, round_time=True, key_filter=None, updated_context_groups_only=False)#

Creates a cross-sectional streaming engine with the specified parameters and configuration.

Parameters:
  • name (str) – The name of the engine. It can contain letters, numbers and “_” and must start with a letter.

  • table_schema (Union[Table, TypeDict]) – Specifies the column names and corresponding types of the input stream. If a Table is provided, its schema must match the schema of the subscribed stream table. Whether the table contains data or not doesn’t matter.

  • key_col (Union[List[str], str]) – One or more columns in the stream table as the key columns. For each key entry, only the latest record is used in the calculation.

  • metrics (optional) – The formulas for calculation using MetaCode or an AnyVector. Defaults to None.

  • output (Table) –

    The output table for the results. It can be an in-memory table or a DFS table. Create an empty table and specify the column names and types before calling create. Make sure the column types match the calculation results of the corresponding metrics. The columns in the output table are in the following order:

    • The first column is of TIMESTAMP type.

      • If use_system_time = True, the column stores the time when each calculation starts.

      • If use_system_time = False, it takes the values of time_col.

    • The following column is the context_by_col (if specified).

    • If the output_elapsed_microseconds is set to True, specify two more columns: a LONG column and an INT column.

    • The remaining columns store the calculation results of metrics.

  • triggering_pattern (Literal["per_batch", "per_row", "interval", "key_count", "data_interval"], optional) – Specifies how to trigger the calculations.

  • triggering_interval (Any, optional) – The triggering interval for the system based on the triggering pattern. Defaults to None.

  • use_system_time (bool, optional) – Whether the calculations are performed based on the system time when data is ingested into the engine. Defaults to True.

  • time_col (Optional[str], optional) – The time column in the stream table to which the engine subscribes if use_system_time = False. Defaults to None.

  • last_batch_only (bool, optional) – Whether to keep only the records with the latest timestamp in the engine. Defaults to False.

  • context_by_col (Optional[Union[List[str], str]], optional) – The grouping column(s) by which calculations are performed within groups. Only takes effect if metrics and output are specified. Defaults to None.

  • snapshot_dir (Optional[Union[Path, str]], optional) – The directory where the streaming engine snapshot is saved. Defaults to None.

  • snapshot_interval_in_msg_count (Optional[int], optional) – The number of messages to receive before saving the next snapshot. Defaults to None.

  • output_elapsed_microseconds (bool, optional) – Whether to output the elapsed time (in microseconds). Defaults to False.

  • round_time (bool, optional) – Aligns the window boundary based on the specified alignment rule. Defaults to True.

  • key_filter (Optional[MetaCode], optional) – The conditions for filtering keys in the keyed table returned by the engine. Defaults to None.

  • updated_context_groups_only (bool, optional) – Whether to compute only the groups updated with new data since the last output. Defaults to False.

Returns:

An instance of CrossSectionalEngineBuilder that allows further configuration and execution of the cross-sectional engine. This object enables setting up the opional parameters.

Return type:

CrossSectionalEngineBuilder

Examples

>>> import swordfish as sf
>>> table_schema = {"timestamp": "DATETIME", "symbol": "STRING", "price":
...     "DOUBLE", "volume": "LONG"}
>>> output_table = sf.table(types={"symbol": "STRING", "avg_price": "DOUBLE",
...     "total_volume": "LONG"})
>>> my_engine = sf.engine.CrossSectionalEngine.create(
...     name="StockAnalysisEngine",
...     table_schema=table_schema,
...     key_col="symbol",
...     metrics=["avg(price)", "sum(volume)"],
...     output=output_table,
...     triggering_pattern="interval",
...     triggering_interval=10,
...     use_system_time=True,
...     time_col="timestamp",
...     last_batch_only=False,
...     snapshot_dir="/path/to/snapshot",
...     snapshot_interval_in_msg_count=1000,
...     round_time=True,
...     updated_context_groups_only=True
... ).submit()