TimeSeriesEngine#
- class swordfish._swordfishcpp.TimeSeriesEngine#
The time-series streaming engine conducts real-time time-series calculations with moving windows.
TimeSeriesEngine.createreturns a Builder object, and then call the submit to create an Engine object to which you can ingest the data for stream processing.There are two types of aggregate operators in the time-series engine: incremental operators and full operators. Incremental operators incrementally aggregate the data as they arrive without keeping the historical data. Full operators (e.g., user-defined aggregate functions, unoptimized built-in aggregate functions, or functions with nested state functions) keep all the data in a window and recompute the output as a full refresh whenever new data arrives.
The following aggregate operators in the time-series engine are optimized for incremental computations:
corr,covar,first,last,max,med,min,percentile,quantile,std,var,sum,sum2,sum3,sum4,wavg,wsum,count,firstNot, ifirstNot, lastNot, ilastNot, imax, imin,nunique,prod,sem,mode,searchK,beta,avg.Windowing Logic
Window boundaries: The engine automatically adjusts the starting point of the first window. (See parameter description for
stepandround_time, and the Alignment Rules section).Window properties:
window_size- the size of each window;closed- whether the left/right boundaries of a window is inclusive/exclusive;step- the duration of time between windows;use_system_time- specifies how values are windowed, which is based on the time column in the data or the system time of data ingestion.Calculation Rules
If
time_colis specified, its values must be increasing. Ifkey_colis specified to group the data, the values intime_colmust be increasing with each group specified bykey_col. Otherwise, out-of-order data will be discarded.If
use_system_time= true, the calculation of a window is triggered as soon as the window ends. Ifuse_system_time= false (withtime_colspecified), the calculation of a window is triggered by the arrival of the next record after the window ends. To trigger the calculation for the uncalculated windows, you can specify the parameterupdate_timeorforce_trigger_time.If
fillis unspecified or “None”, only windows with calculation results are output. Iffillis specified, all windows are output, and the empty windows are filled using the specified filling method.If
update_time= 0, incoming records in the current window can be immediately calculated and output.
Other Features
Data/state cleanup: You can set a cleanup rule to clear historical data. (See parameters
key_purge_filterandkey_purge_freq_in_sec)Snapshot: Snapshot mechanism is used to restore the streaming engine to the latest snapshot after system interruption. (See parameters
snapshot_dirandsnapshot_interval_in_msg_count)
Alignment Rules
To facilitate observation and comparison of calculation results, the engine automatically adjusts the starting point of the first window. The alignment size (integer) is determined by the parameters
step,round_time, and the precision oftime_column. When the time series engine calculates within groups, all groups’ windows will be uniformly aligned, and the boundaries of each window are the same for each group.Case 1:
time_columnis of type Minute (HH:mm)Range
alignment_size
0 ~ 2
2
3
3
4 ~ 5
5
6 ~ 10
10
11 ~ 15
15
16 ~ 20
20
21 ~ 30
30
> 30
60 (1 hour)
- If
round_time= True: The value of
alignment_sizeis the same as the above table ifstep≤ 30.If
step> 30, then:
step
alignment_size
31 ~ 60
60 (1 hour)
61 ~ 120
120 (2 hours)
121 ~ 180
180 (3 hours)
181 ~ 300
300 (5 hours)
301 ~ 600
600 (10 hours)
601 ~ 900
900 (15 hours)
901 ~ 1200
1200 (20 hours)
1201 ~ 1800
1800 (30 hours)
> 1800
3600 (60 hours)
- If
Case 2:
time_columnis of type Datetime (yyyy-MM-dd HH:mm:ss) or Second (HH:mm:ss)If
round_time= False:step
alignment_size
0 ~ 2
2
3
3
4 ~ 5
5
6 ~ 10
10
11 ~ 15
15
16 ~ 20
20
21 ~ 30
30
> 30
60 (1 minute)
- If
round_time= True: The value of
alignment_sizeis the same as the above table ifstep≤ 30.If
step> 30, then:
step
alignment_size
31 ~ 60
60 (1 minute)
61 ~ 120
120 (2 minutes)
121 ~ 180
180 (3 minutes)
181 ~ 300
300 (5 minutes)
301 ~ 600
600 (10 minutes)
601 ~ 900
900 (15 minutes)
901 ~ 1200
1200 (20 minutes)
1201 ~ 1800
1800 (30 minutes)
> 1800
3600 (1 hour)
- If
Case 3:
time_columnis of type Timestamp (yyyy-MM-dd HH:mm:ss.mmm) or TIME (HH:mm:ss.mmm)If
round_time= False:step
alignment_size
0 ~ 2ns
2ns
3ns ~ 5ns
5ns
6ns ~ 10ns
10ns
11ns ~ 20ns
20ns
21ns ~ 25ns
25ns
26ns ~ 50ns
50ns
51ns ~ 100ns
100ns
101ns ~ 200ns
200ns
201ns ~ 250ns
250ns
251ns ~ 500ns
500ns
> 500ns
1000ns
- If
round_time= True: The value of
alignment_sizeis the same as the above table ifstep≤ 30000.If
step> 30000, then:
step
alignment_size
30001 ~ 60000
60000 (1 minute)
60001 ~ 120000
120000 (2 minutes)
120001 ~ 300000
300000 (5 minutes)
300001 ~ 600000
600000 (10 minutes)
600001 ~ 900000
900000 (15 minutes)
900001 ~ 1200000
1200000 (20 minutes)
1200001 ~ 1800000
1800000 (30 minutes)
> 1800000
3600000 (1 hour)
- If
Case 4:
time_columnis of type Nanotimestamp (yyyy-MM-dd HH:mm:ss.nnnnnnnnn) or NANOTIME (HH:mm:ss.nnnnnnnnn)If
round_time= False:step
alignment_size
0 ~ 2ns
2ns
3ns ~ 5ns
5ns
6ns ~ 10ns
10ns
11ns ~ 20ns
20ns
21ns ~ 25ns
25ns
26ns ~ 50ns
50ns
51ns ~ 100ns
100ns
101ns ~ 200ns
200ns
201ns ~ 250ns
250ns
251ns ~ 500ns
500ns
> 500ns
1000ns
If
round_time= True:step
alignment_size
1000ns ~ 1ms
1ms
1ms ~ 10ms
10ms
10ms ~ 100ms
100ms
100ms ~ 1s
1s
1s ~ 2s
2s
2s ~ 3s
3s
3s ~ 5s
5s
5s ~ 10s
10s
10s ~ 15s
15s
15s ~ 20s
20s
20s ~ 30s
30s
> 30s
1min
If the time of the first record is x with data type of Timestamp, then the starting time of the first window is adjusted to be
timeType_cast(x/alignment_size*alignment_size+step- window_size), where “/” produces only the integer part after division. For example, if the time of the first record is 2018.10.08T01:01:01.365,window_size= 120000, andstep= 60000, thenalignment_size= 60000, and the starting time of the first window istimestamp(2018.10.08T01:01:01.365/60000*60000+60000-120000)=2018.10.08T01:01:00.000.- engine_type: EngineType#
The type of the streaming engine.
- stat: TimeSeriesEngineStat#
Descriptive statistics related to the streaming engine.
- classmethod create(name, table_schema, outputs, window_size, step, metrics, *, time_col=None, use_system_time=False, key_col=None, garbage_size=5000, update_time=None, use_window_start_time=False, round_time=True, snapshot_dir=None, snapshot_interval_in_msg_count=None, fill='none', force_trigger_time=None, key_purge_freq_in_sec=None, closed='left', output_elapsed_microseconds=False, sub_window=None, parallelism=1, accepted_delay=0, output_handler=None, msg_as_table=False)#
Creates a time-series streaming engine with the specified parameters.
- Parameters:
name (str) – The name of the engine. Can contain letters, numbers, and “_”, and must start with a letter.
table_schema (Union[Table, TypeDict]) – Column names and types of the input stream. If a Table is provided, its schema must match the subscribed stream table.
outputs (Table) –
The output table for results. Can be in-memory or DFS. Create an empty table and specify column names and types before calling
create. Output columns:First column is TIMESTAMP type.
If
use_system_timeis True, stores calculation start time.If False, uses
time_colvalues.
Next column is
context_by_col(if specified).If
output_elapsed_microsecondsis True, add LONG and INT columns.Remaining columns store metric results. If a metric result is an array vector, the output column must be array vector type.
window_size (int or list of int) – Size(s) of the calculation windows.
step (int) –
Step size for moving windows. Must be divisible by
window_size. Unit depends onuse_system_time:If True, unit is millisecond.
If False, unit matches
time_col.
metrics (MetaCode or AnyVector) –
Calculation formulas. Can be:
Aggregate functions, e.g.,
<[sum(qty), avg(price)]>.Expressions on previous results, e.g.,
<[avg(price1)-avg(price2)]>.Calculations on multiple columns, e.g.,
<[std(price1-price2)]>.Functions with multiple returns, e.g.,
<func(price) as `col1`col2>.
Column names in
metricsare not case-sensitive and can differ from input table columns. Nested aggregate functions are not supported.time_col (Optional[Union[List[str], str]], optional) – Time column(s) for the stream table. Default is None.
use_system_time (bool, optional) – Whether to use system time for calculations. Default is False.
key_col (Optional[Union[List[str], str]], optional) – Grouping column(s). Default is None.
garbage_size (int, optional) – Threshold for garbage collection of historical data. Default is 5000.
update_time (Optional[int], optional) – Interval to trigger window calculations before window ends. Default is None.
use_window_start_time (bool, optional) – Whether output table time column uses window start time. Default is False.
round_time (bool, optional) – Align window boundary by alignment rule. Default is True.
snapshot_dir (Optional[Union[Path, str]], optional) – Directory to save engine snapshot. Default is None.
snapshot_interval_in_msg_count (Optional[int], optional) – Number of messages before saving next snapshot. Default is None.
fill (Union[Literal["none", "null", "ffill"], Constant,) – List[Union[Literal[“null”, “ffill”], Constant]]], optional Filling method(s) for empty windows in a group. Default is “none”.
force_trigger_time (Optional[int], optional) – Waiting time to force trigger calculation in uncalculated windows. Default is None.
key_purge_freq_in_sec (Optional[int], optional) – Interval in seconds to remove inactive groups. Default is None.
closed (Literal["left", "right"], optional) – Whether left or right boundary is included in window. Default is “left”.
output_elapsed_microseconds (bool, optional) – Whether to output elapsed time (in microseconds). Default is False.
sub_window (Optional[Union[int, Constant]], optional) – Range of subwindow within window defined by
window_size. Default is None.parallelism (int, optional) – Number of worker threads for parallel computation. Default is 1.
accepted_delay (int, optional) – Maximum delay for each window to accept data. Default is 0.
output_handler (Optional[FunctionDef], optional) – Unary or partial function to handle output. If set, engine does not write results to output table directly. Default is None.
msg_as_table (bool, optional) – Whether output data is passed to
output_handleras table or AnyVector. Default is False.
- Returns:
Instance for further configuration and execution.
- Return type:
Examples
>>> import swordfish as sf >>> table_schema = {"timestamp": "DATETIME", "sensor_id": "LONG", ... "temperature": "DOUBLE", "humidity": "DOUBLE"} >>> output_table_1 = sf.table(types={"timestamp": "DATETIME", ... "sensor_id": "LONG", "temperature": "DOUBLE"}) >>> output_table_2 = sf.table(types={"timestamp": "DATETIME", ... "sensor_id": "LONG", "humidity": "DOUBLE"}) >>> my_engine = sf.engine.TimeSeriesEngine.create( ... name="SensorTimeSeriesEngine", ... table_schema=table_schema, ... outputs=[output_table_1, output_table_2], ... window_size=5, ... step=1, ... metrics=["temperature", "humidity"], ... time_col="timestamp", ... use_system_time=True, ... key_col="sensor_id", ... garbage_size=5000, ... update_time=1000, ... snapshot_dir="/path/to/snapshot/dir", ... snapshot_interval_in_msg_count=100, ... fill="ffill", ... parallelism=4, ... accepted_delay=10, ... output_handler=None, ... msg_as_table=True, ... ).submit()