CrossSectionalEngineBuilder#
- class swordfish._engine.CrossSectionalEngineBuilder(name, table_schema, key_col, *, metrics=None, output=None, triggering_pattern='per_batch', triggering_interval=None, use_system_time=True, time_col=None, last_batch_only=False, context_by_col=None, snapshot_dir=None, snapshot_interval_in_msg_count=None, output_elapsed_microseconds=False, round_time=True, key_filter=None, updated_context_groups_only=False)#
- Parameters:
name (str)
key_col (List[str] | str)
output (Table)
triggering_pattern (Literal['per_batch', 'per_row', 'interval', 'key_count', 'data_interval'])
triggering_interval (Any)
use_system_time (bool)
time_col (str | None)
last_batch_only (bool)
context_by_col (List[str] | str | None)
snapshot_dir (Path | str | None)
snapshot_interval_in_msg_count (int | None)
output_elapsed_microseconds (bool)
round_time (bool)
key_filter (MetaCode | None)
updated_context_groups_only (bool)
- metrics(val=None)#
Specifies the formulas for calculation using MetaCode or an AnyVector.
- The value can be:
Built-in or user-defined aggregate functions, e.g.,
<myfunc(qty)>
>>> @F.swordfish_udf >>> def myFunc(x): ... return x + 1 ... >>> with sf.meta_code() as m: ... metircs = myFunc(m.col("qty"))
Expressions on previous results, e.g.,
<avg(price1)>.
>>> with sf.meta_code() as m: ... metrics = F.avg(m.col("price1"))
Calculations on multiple columns, e.g.,
<[std(price1-price2)]>.
>>> with sf.meta_code() as m: ... metrics = F.std(m.col("price1") - m.col("price2"))
Functions with multiple returns, such as
<func(price) as `col1`col2>.
>>> with sf.meta_code() as m: ... metrics = m.col_alias(func(m.col("price")), ["col1", "col2"])
The column names specified in
metricsare not case-sensitive and can be inconsistent with the column names of the input tables.
- output(val=None)#
Specifies the output table for the results.
If context_by_col is not specified, the output columns are in the following order:
The first column is of TIMESTAMP type, storing the time when each calculation starts. If
time_colis specified, it takes the values oftime_col.The column(s) storing calculation results. The data types of the column(s) must be the same as the results of metrics.
A column of LONG type storing the calculation time of each batch. Output only when output_elapsed_microseconds=True.
A column of INT type storing the number of records of each batch. Output only when output_elapsed_microseconds=True.
If context_by_col is specified, the output columns are in the following order:
The first column is of TIMESTAMP type, storing the time when each calculation starts. If time_col is specified, it takes the values of time_col.
The second column is the column specified by context_by_col.
The column(s) storing calculation results. The data types of the column(s) must be the same as the results of metrics.
A column of LONG type storing the calculation time of each batch. Output only when output_elapsed_microseconds=true.
A column of INT type storing the number of records of each batch. Output only when output_elapsed_microseconds=true.
- Parameters:
val (Table, optional) – an in-memory table or a DFS table, by default None
- Returns:
The instance itself.
- Return type:
Self
- triggering_pattern(val='per_batch')#
Specifies how to trigger the calculations. The engine returns a result every time a calculation is triggered.
- Parameters:
val (Literal["per_batch", "per_row", "interval", "key_count", "data_interval"], optional) –
‘per_batch’ (default): Calculates when a batch of data arrives.
’per_row’: Calculates when a new record arrives.
’interval’: Calculates at intervals specified by
triggering_interval, using system time.’key_count’: When data with the same timestamp arrives in batches, the calculation is triggered when the number of keys with the latest timestamp reaches
triggering_interval, or data with a newer timestamp arrives.’data_interval’: Calculates at intervals based on timestamps in the data. Requires
time_colto be specified anduse_system_timeto be False.
note:: (..) – To use ‘key_count’ or ‘data_interval’,
time_colmust be specified anduse_system_timemust be set to False. Out-of-order data will be discarded in these cases.
- Returns:
The instance itself.
- Return type:
Self
- triggering_interval(val=None)#
Sets the triggering interval for the system based on the triggering pattern.
The behavior of
triggering_intervaldepends on the value oftriggering_pattern:If
triggering_pattern= ‘interval’,triggering_intervalis a positive integer indicating the interval in milliseconds between 2 adjacent calculations. Default is 1,000 milliseconds. A calculation is triggered everytriggering_intervalmilliseconds if the data in the engine has not been calculated.If
triggering_pattern= ‘keyCount’,triggering_intervalcan either be:An integer specifying a threshold for the number of uncalculated records.
A tuple of 2 elements:
The first element is an integer specifying the threshold of records with the latest timestamp to trigger a calculation.
The second element can be either:
An int threshold
A Duration value. For example, when
triggering_intervalis set to (c1, c2):If c2 is an integer and the number of keys with the latest timestamp t1 doesn’t reach c1, calculation will not be triggered and the system goes on to save data with greater timestamp t2 (t2>t1). Data with t1 will be calculated when either of the events happens: the number of keys with timestamp t2 reaches c2, or data with greater timestamp t3 (t3>t2) arrives. Note that c2 must be smaller than c1.
If c2 is a duration and the number of keys with the latest timestamp t1 doesn’t reach c1, calculation will not be triggered and the system goes on to save data with greater timestamp t2 (t2>t1). Once data with t2 starts to come in, data with t1 will not be calculated until any of the events happens: the number of keys with timestamp t1 reaches c1, or data with greater timestamp t3 (t3>t2) arrives, or the Duration c2 comes to an end.
If
triggering_pattern= ‘dataInterval’,triggering_intervalis a positive integer in the same units as the timestamps intime_col. Default is 1,000 milliseconds. A calculation is triggered for all data in the current window when the first record of the next window arrives. A calculation is triggered only for windows containing data.
- Parameters:
val (Any, optional) – The triggering interval or conditions. Defaults to None.
- Returns:
The instance itself.
- Return type:
Self
- use_system_time(val=True)#
Sets whether calculations are performed based on the system time when data is ingested into the engine.
If
use_system_timeis True, the time column of output table is the system time.If
use_system_timeis False, thetime_Colparameter must be specified, and the time column of output table uses the timestamp of each record.
- Parameters:
val (bool, optional) – Indicates whether to use system time for calculations. Defaults to True.
- Returns:
The instance itself.
- Return type:
Self
- time_col(val=None)#
Specifies the time column in the stream table to which the engine subscribes when
use_system_timeis False. The column must be of Timestamp type.- Parameters:
val (Optional[str], optional) – The name of the time column. Defaults to None.
- Returns:
The instance itself.
- Return type:
Self
- last_batch_only(val=False)#
Determines whether to keep only the records with the latest timestamp in the engine.
When
last_batch_onlyis true,triggering_patternmust be set to ‘keyCount’, and the cross-sectional engine will only maintain key values with the latest timestamp for calculation.Otherwise, the engine updates and retains all values for calculation.
- Parameters:
val (bool, optional) – Whether to keep only the latest timestamped records. Defaults to False.
- Returns:
The instance itself.
- Return type:
Self
- context_by_col(val=None)#
Specifies the grouping column(s) by which calculations are performed within groups. This parameter only takes effect if
metricsandoutputare specified.If
metricscontain only aggregate functions, the results will be the same as a SQL query usinggroup by.Otherwise, the results will be consistent with using
context by.- Parameters:
val (Optional[Union[List[str], str]], optional) – The grouping column(s) for the calculation. Defaults to None.
- Returns:
The instance itself.
- Return type:
Self
- snapshot_dir(val=None)#
Sets the directory where the streaming engine snapshot is saved.
The directory must already exist, or an exception will be raised. If a snapshot directory is specified, the system checks for an existing snapshot in the directory when creating the streaming engine.
If found, the snapshot is loaded to restore the engine’s state. Multiple streaming engines can share a directory, with snapshot files named after the engine names.
- Snapshot file extensions:
<engineName>.tmp: Temporary snapshot.<engineName>.snapshot: A snapshot that is flushed to disk.<engineName>.old: If a snapshot with the same name exists, the previous one is renamed to<engineName>.old.
- Parameters:
val (Optional[Union[Path, str]], optional) – The directory path for saving the snapshot. Defaults to None.
- Returns:
The instance itself.
- Return type:
Self
- snapshot_interval_in_msg_count(val=None)#
Sets the number of messages to receive before saving the next snapshot.
- Parameters:
val (Optional[int], optional) – The number of messages before the next snapshot. Defaults to None.
- Returns:
The instance itself.
- Return type:
Self
- output_elapsed_microseconds(val=False)#
Determines whether to output the elapsed time (in microseconds).
The elapsed time is measured from when the calculation is triggered to when the result is output for each window. When both
output_elapsed_microsecondsanduseSystemTimeparameters are set to true, aggregate function cannot be used inmetrics.- Parameters:
val (bool, optional) – Whether to output the elapsed time. Defaults to False.
- Returns:
The instance itself.
- Return type:
Self
- round_time(val=True)#
Aligns the window boundary based on the specified alignment rule.
If the time precision is in milliseconds or seconds and the step is greater than one minute, this method determines whether to apply multi-minute or one-minute alignment.
- Parameters:
val (bool, optional) – If True, uses the multi-minute rule for alignment. If False, uses the one-minute rule. Defaults to True.
- Returns:
The instance itself.
- Return type:
Self
- key_filter(val=None)#
Specifies the conditions for filtering keys in the keyed table returned by the engine.
Only data with keys satisfying the filtering conditions will be taken for calculation. The MetaCode represents an expression or function call that returns a bool vector.
- Parameters:
val (Optional[MetaCode], optional) – MetaCode of the filtering conditions. Defaults to None.
- Returns:
The instance itself.
- Return type:
Self
- updated_context_groups_only(val=False)#
Indicates whether to compute only the groups updated with new data since the last output.
- Parameters:
val (bool, optional) – Whether to compute only updated groups. Defaults to False.
- Returns:
The instance itself.
- Return type:
Self
- submit()#
Abstract method to build a StreamEngine.
- Returns:
An instance of a built StreamEngine.
- Return type: