ReactiveStateEngineBuilder#
- class swordfish._engine.ReactiveStateEngineBuilder(name, table_schema, output, metrics, *, key_col=None, filter=None, snapshot_dir=None, snapshot_interval_in_msg_count=None, keep_order=None, key_purge_filter=None, key_purge_freq_in_second=None, output_elapsed_microseconds=False, key_capacity=1024, parallelism=1, output_handler=None, msg_as_table=False)#
- Parameters:
name (str)
output (Table)
key_col (List[str] | str | None)
filter (MetaCode | None)
snapshot_dir (Path | str | None)
snapshot_interval_in_msg_count (int | None)
keep_order (bool | None)
key_purge_filter (MetaCode | None)
key_purge_freq_in_second (int | None)
output_elapsed_microseconds (bool)
key_capacity (int)
parallelism (int)
output_handler (FunctionDef | None)
msg_as_table (bool)
- key_col(val=None)#
Specifies the grouping column(s) for the calculation.
The calculation is conducted within each group defined by the specified column(s).
- Parameters:
val (Optional[Union[List[str], str]], optional) – The column(s) to group by. Defaults to None.
- Returns:
The instance itself.
- Return type:
Self
- filter(val=None)#
Specifies the filtering conditions for the output table.
The MetaCode represents the filtering conditions, which must be an expression and can only include columns of
dummy_table. Multiple conditions can be combined using logical operators (and, or). Only results satisfying the filter conditions are included in the output table.- Parameters:
val (Optional[MetaCode], optional) – The MetaCode representing the filtering conditions. Defaults to None.
- Returns:
The instance itself.
- Return type:
Self
- snapshot_dir(val=None)#
Sets the directory where the streaming engine snapshot is saved.
The directory must already exist, or an exception will be raised. If a snapshot directory is specified, the system checks for an existing snapshot in the directory when creating the streaming engine.
If found, the snapshot is loaded to restore the engine’s state. Multiple streaming engines can share a directory, with snapshot files named after the engine names.
- Snapshot file extensions:
<engineName>.tmp: Temporary snapshot.<engineName>.snapshot: A snapshot that is flushed to disk.<engineName>.old: If a snapshot with the same name exists, the previous one is renamed to<engineName>.old.
- Parameters:
val (Optional[Union[Path, str]], optional) – The directory path for saving the snapshot. Defaults to None.
- Returns:
The instance itself.
- Return type:
Self
- snapshot_interval_in_msg_count(val=None)#
Sets the number of messages to receive before saving the next snapshot.
- Parameters:
val (Optional[int], optional) – The number of messages before the next snapshot. Defaults to None.
- Returns:
The instance itself.
- Return type:
Self
- keep_order(val=None)#
Specifies whether to preserve the insertion order of records in the output table.
If
key_colcontains a time column, the default value is True; otherwise, it is False.- Parameters:
val (Optional[bool], optional) – Whether to preserve the insertion order. Defaults to None.
- Returns:
The instance itself.
- Return type:
Self
- key_purge_filter(val=None)#
Sets the filtering conditions to identify the data to be purged from the cache.
To clean up unnecessary data after calculations, specify both
key_purge_filterandkey_purge_freq_in_second.This is MetaCode composed of conditional expressions that must refer to columns in the output table. The filter is effective only when
key_colis specified.- Parameters:
val (Optional[MetaCode], optional) – The MetaCode filter conditions. Defaults to None.
- Returns:
The instance itself.
- Return type:
Self
- key_purge_freq_in_second(val=None)#
Sets the time interval (in seconds) to trigger a purge. The purge is triggered when the time since the last data ingestion meets or exceeds this interval.
The filter is effective only when
key_colis specified.For each data ingestion, a purge is triggered if the following conditions are met:
The time elapsed since the last data ingestion is equal to or greater than
key_purge_freq_in_second(for the first check, the time elapsed between data ingestion and engine creation is used).If the first condition is met,
key_purge_filteris applied to determine the data to be purged.The number of groups containing data to be purged is equal to or greater than 10% of the total groups in the engine.
To check engine status before and after the purge, access the attribute
ReactiveStateEngine.stat, where thenumGroupsfield indicates the number of groups in the reactive state engine.- Parameters:
val (Optional[int], optional) – The time interval (in seconds) to trigger the purge. Defaults to None.
- Returns:
The instance itself.
- Return type:
Self
- output_elapsed_microseconds(val=False)#
Determines whether to output the elapsed time (in microseconds).
The elapsed time is measured from when the calculation is triggered to when the result is output for each window. When both
output_elapsed_microsecondsanduseSystemTimeparameters are set to true, aggregate function cannot be used inmetrics.- Parameters:
val (bool, optional) – Whether to output the elapsed time. Defaults to False.
- Returns:
The instance itself.
- Return type:
Self
- key_capacity(val=1024)#
A positive integer indicating the amount of memory allocated for buffering state of each group.
The memory is allocated on a row basis. The default value is 1024. For data with a large number of groups, setting this parameter can reduce latency.
- Parameters:
val (int, optional) – A positive integer. Defaults to 1024.
- Returns:
The instance itself.
- Return type:
Self
- parallelism(val=1)#
A positive integer no greater than 63, indicating the maximum number of workers that can run in parallel.
The default value is 1. For large computation workloads, adjusting this parameter can effectively utilize computing resources and reduce computation time.
Note:
parallelismcannot exceed the lesser of the numbers of logical cores minus one.- Parameters:
val (int, optional) – A positive integer. Defaults to 1.
- Returns:
The instance itself.
- Return type:
Self
- output_handler(val=None)#
A unary function or a partial function with a single unfixed parameter.
If set, the engine will not write the calculation results to the output table directly. Instead, the results will be passed as a parameter to the specified function.
- Parameters:
val (Optional[FunctionDef], optional) – A unary function or a partial function with a single unfixed parameter. The default value is null, which means the result will be written to the output table.
- Returns:
The instance itself.
- Return type:
Self
- msg_as_table(val=False)#
Sets whether the output data is passed into the function (specified by
output_handler) as a Table or as an AnyVector. If True, the data is passed as a Table; otherwise, it is passed as AnyVector of columns.- Parameters:
val (bool, optional) – Whether to pass data as a Table (True) or as an AnyVector (False). Defaults to False.
- Returns:
The instance itself.
- Return type:
Self
- submit()#
Abstract method to build a StreamEngine.
- Returns:
An instance of a built StreamEngine.
- Return type: