TimeSeriesEngine#

class swordfish._swordfishcpp.TimeSeriesEngine#

The time-series streaming engine conducts real-time time-series calculations with moving windows.

TimeSeriesEngine.create returns a Builder object, and then call the submit to create an Engine object to which you can ingest the data for stream processing.

There are two types of aggregate operators in the time-series engine: incremental operators and full operators. Incremental operators incrementally aggregate the data as they arrive without keeping the historical data. Full operators (e.g., user-defined aggregate functions, unoptimized built-in aggregate functions, or functions with nested state functions) keep all the data in a window and recompute the output as a full refresh whenever new data arrives.

The following aggregate operators in the time-series engine are optimized for incremental computations: corr, covar, first, last, max, med, min, percentile, quantile, std, var, sum, sum2, sum3, sum4, wavg, wsum, count, firstNot, ifirstNot, lastNot, ilastNot, imax, imin, nunique, prod, sem, mode, searchK, beta, avg.

Windowing Logic

Window boundaries: The engine automatically adjusts the starting point of the first window. (See parameter description for step and round_time, and the Alignment Rules section).

Window properties:

window_size - the size of each window; closed - whether the left/right boundaries of a window is inclusive/exclusive; step - the duration of time between windows; use_system_time - specifies how values are windowed, which is based on the time column in the data or the system time of data ingestion.

Calculation Rules

  • If time_col is specified, its values must be increasing. If key_col is specified to group the data, the values in time_col must be increasing with each group specified by key_col. Otherwise, out-of-order data will be discarded.

  • If use_system_time = true, the calculation of a window is triggered as soon as the window ends. If use_system_time = false (with time_col specified), the calculation of a window is triggered by the arrival of the next record after the window ends. To trigger the calculation for the uncalculated windows, you can specify the parameter update_time or force_trigger_time.

  • If fill is unspecified or “None”, only windows with calculation results are output. If fill is specified, all windows are output, and the empty windows are filled using the specified filling method.

  • If update_time = 0, incoming records in the current window can be immediately calculated and output.

Other Features

  • Data/state cleanup: You can set a cleanup rule to clear historical data. (See parameters key_purge_filter and key_purge_freq_in_sec)

  • Snapshot: Snapshot mechanism is used to restore the streaming engine to the latest snapshot after system interruption. (See parameters snapshot_dir and snapshot_interval_in_msg_count)

Alignment Rules

To facilitate observation and comparison of calculation results, the engine automatically adjusts the starting point of the first window. The alignment size (integer) is determined by the parameters step, round_time, and the precision of time_column. When the time series engine calculates within groups, all groups’ windows will be uniformly aligned, and the boundaries of each window are the same for each group.

  • Case 1: time_column is of type Minute (HH:mm)

    Range

    alignment_size

    0 ~ 2

    2

    3

    3

    4 ~ 5

    5

    6 ~ 10

    10

    11 ~ 15

    15

    16 ~ 20

    20

    21 ~ 30

    30

    > 30

    60 (1 hour)

    If round_time = True:
    • The value of alignment_size is the same as the above table if step ≤ 30.

    • If step > 30, then:

    step

    alignment_size

    31 ~ 60

    60 (1 hour)

    61 ~ 120

    120 (2 hours)

    121 ~ 180

    180 (3 hours)

    181 ~ 300

    300 (5 hours)

    301 ~ 600

    600 (10 hours)

    601 ~ 900

    900 (15 hours)

    901 ~ 1200

    1200 (20 hours)

    1201 ~ 1800

    1800 (30 hours)

    > 1800

    3600 (60 hours)

  • Case 2: time_column is of type Datetime (yyyy-MM-dd HH:mm:ss) or Second (HH:mm:ss)

    If round_time = False:

    step

    alignment_size

    0 ~ 2

    2

    3

    3

    4 ~ 5

    5

    6 ~ 10

    10

    11 ~ 15

    15

    16 ~ 20

    20

    21 ~ 30

    30

    > 30

    60 (1 minute)

    If round_time = True:
    • The value of alignment_size is the same as the above table if step ≤ 30.

    • If step > 30, then:

    step

    alignment_size

    31 ~ 60

    60 (1 minute)

    61 ~ 120

    120 (2 minutes)

    121 ~ 180

    180 (3 minutes)

    181 ~ 300

    300 (5 minutes)

    301 ~ 600

    600 (10 minutes)

    601 ~ 900

    900 (15 minutes)

    901 ~ 1200

    1200 (20 minutes)

    1201 ~ 1800

    1800 (30 minutes)

    > 1800

    3600 (1 hour)

  • Case 3: time_column is of type Timestamp (yyyy-MM-dd HH:mm:ss.mmm) or TIME (HH:mm:ss.mmm)

    If round_time = False:

    step

    alignment_size

    0 ~ 2ns

    2ns

    3ns ~ 5ns

    5ns

    6ns ~ 10ns

    10ns

    11ns ~ 20ns

    20ns

    21ns ~ 25ns

    25ns

    26ns ~ 50ns

    50ns

    51ns ~ 100ns

    100ns

    101ns ~ 200ns

    200ns

    201ns ~ 250ns

    250ns

    251ns ~ 500ns

    500ns

    > 500ns

    1000ns

    If round_time = True:
    • The value of alignment_size is the same as the above table if step ≤ 30000.

    • If step > 30000, then:

    step

    alignment_size

    30001 ~ 60000

    60000 (1 minute)

    60001 ~ 120000

    120000 (2 minutes)

    120001 ~ 300000

    300000 (5 minutes)

    300001 ~ 600000

    600000 (10 minutes)

    600001 ~ 900000

    900000 (15 minutes)

    900001 ~ 1200000

    1200000 (20 minutes)

    1200001 ~ 1800000

    1800000 (30 minutes)

    > 1800000

    3600000 (1 hour)

  • Case 4: time_column is of type Nanotimestamp (yyyy-MM-dd HH:mm:ss.nnnnnnnnn) or NANOTIME (HH:mm:ss.nnnnnnnnn)

    If round_time = False:

    step

    alignment_size

    0 ~ 2ns

    2ns

    3ns ~ 5ns

    5ns

    6ns ~ 10ns

    10ns

    11ns ~ 20ns

    20ns

    21ns ~ 25ns

    25ns

    26ns ~ 50ns

    50ns

    51ns ~ 100ns

    100ns

    101ns ~ 200ns

    200ns

    201ns ~ 250ns

    250ns

    251ns ~ 500ns

    500ns

    > 500ns

    1000ns

    If round_time = True:

    step

    alignment_size

    1000ns ~ 1ms

    1ms

    1ms ~ 10ms

    10ms

    10ms ~ 100ms

    100ms

    100ms ~ 1s

    1s

    1s ~ 2s

    2s

    2s ~ 3s

    3s

    3s ~ 5s

    5s

    5s ~ 10s

    10s

    10s ~ 15s

    15s

    15s ~ 20s

    20s

    20s ~ 30s

    30s

    > 30s

    1min

If the time of the first record is x with data type of Timestamp, then the starting time of the first window is adjusted to be timeType_cast(x/alignment_size*alignment_size+step- window_size), where “/” produces only the integer part after division. For example, if the time of the first record is 2018.10.08T01:01:01.365, window_size = 120000, and step = 60000, then alignment_size = 60000, and the starting time of the first window is timestamp(2018.10.08T01:01:01.365/60000*60000+60000-120000)=2018.10.08T01:01:00.000.

engine_type: EngineType#

The type of the streaming engine.

stat: TimeSeriesEngineStat#

Descriptive statistics related to the streaming engine.

classmethod create(name, table_schema, outputs, window_size, step, metrics, *, time_col=None, use_system_time=False, key_col=None, garbage_size=5000, update_time=None, use_window_start_time=False, round_time=True, snapshot_dir=None, snapshot_interval_in_msg_count=None, fill='none', force_trigger_time=None, key_purge_freq_in_sec=None, closed='left', output_elapsed_microseconds=False, sub_window=None, parallelism=1, accepted_delay=0, output_handler=None, msg_as_table=False)#

Creates a time-series streaming engine with the specified parameters.

Parameters:
  • name (str) – The name of the engine. Can contain letters, numbers, and “_”, and must start with a letter.

  • table_schema (Union[Table, TypeDict]) – Column names and types of the input stream. If a Table is provided, its schema must match the subscribed stream table.

  • outputs (Table) –

    The output table for results. Can be in-memory or DFS. Create an empty table and specify column names and types before calling create. Output columns:

    • First column is TIMESTAMP type.

      • If use_system_time is True, stores calculation start time.

      • If False, uses time_col values.

    • Next column is context_by_col (if specified).

    • If output_elapsed_microseconds is True, add LONG and INT columns.

    • Remaining columns store metric results. If a metric result is an array vector, the output column must be array vector type.

  • window_size (int or list of int) – Size(s) of the calculation windows.

  • step (int) –

    Step size for moving windows. Must be divisible by window_size. Unit depends on use_system_time:

    • If True, unit is millisecond.

    • If False, unit matches time_col.

  • metrics (MetaCode or AnyVector) –

    Calculation formulas. Can be:

    • Aggregate functions, e.g., <[sum(qty), avg(price)]>.

    • Expressions on previous results, e.g., <[avg(price1)-avg(price2)]>.

    • Calculations on multiple columns, e.g., <[std(price1-price2)]>.

    • Functions with multiple returns, e.g., <func(price) as `col1`col2>.

    Column names in metrics are not case-sensitive and can differ from input table columns. Nested aggregate functions are not supported.

  • time_col (Optional[Union[List[str], str]], optional) – Time column(s) for the stream table. Default is None.

  • use_system_time (bool, optional) – Whether to use system time for calculations. Default is False.

  • key_col (Optional[Union[List[str], str]], optional) – Grouping column(s). Default is None.

  • garbage_size (int, optional) – Threshold for garbage collection of historical data. Default is 5000.

  • update_time (Optional[int], optional) – Interval to trigger window calculations before window ends. Default is None.

  • use_window_start_time (bool, optional) – Whether output table time column uses window start time. Default is False.

  • round_time (bool, optional) – Align window boundary by alignment rule. Default is True.

  • snapshot_dir (Optional[Union[Path, str]], optional) – Directory to save engine snapshot. Default is None.

  • snapshot_interval_in_msg_count (Optional[int], optional) – Number of messages before saving next snapshot. Default is None.

  • fill (Union[Literal["none", "null", "ffill"], Constant,) – List[Union[Literal[“null”, “ffill”], Constant]]], optional Filling method(s) for empty windows in a group. Default is “none”.

  • force_trigger_time (Optional[int], optional) – Waiting time to force trigger calculation in uncalculated windows. Default is None.

  • key_purge_freq_in_sec (Optional[int], optional) – Interval in seconds to remove inactive groups. Default is None.

  • closed (Literal["left", "right"], optional) – Whether left or right boundary is included in window. Default is “left”.

  • output_elapsed_microseconds (bool, optional) – Whether to output elapsed time (in microseconds). Default is False.

  • sub_window (Optional[Union[int, Constant]], optional) – Range of subwindow within window defined by window_size. Default is None.

  • parallelism (int, optional) – Number of worker threads for parallel computation. Default is 1.

  • accepted_delay (int, optional) – Maximum delay for each window to accept data. Default is 0.

  • output_handler (Optional[FunctionDef], optional) – Unary or partial function to handle output. If set, engine does not write results to output table directly. Default is None.

  • msg_as_table (bool, optional) – Whether output data is passed to output_handler as table or AnyVector. Default is False.

Returns:

Instance for further configuration and execution.

Return type:

TimeSeriesEngineBuilder

Examples

>>> import swordfish as sf
>>> table_schema = {"timestamp": "DATETIME", "sensor_id": "LONG",
...     "temperature": "DOUBLE", "humidity": "DOUBLE"}
>>> output_table_1 = sf.table(types={"timestamp": "DATETIME",
...     "sensor_id": "LONG", "temperature": "DOUBLE"})
>>> output_table_2 = sf.table(types={"timestamp": "DATETIME",
...     "sensor_id": "LONG", "humidity": "DOUBLE"})
>>> my_engine = sf.engine.TimeSeriesEngine.create(
...     name="SensorTimeSeriesEngine",
...     table_schema=table_schema,
...     outputs=[output_table_1, output_table_2],
...     window_size=5,
...     step=1,
...     metrics=["temperature", "humidity"],
...     time_col="timestamp",
...     use_system_time=True,
...     key_col="sensor_id",
...     garbage_size=5000,
...     update_time=1000,
...     snapshot_dir="/path/to/snapshot/dir",
...     snapshot_interval_in_msg_count=100,
...     fill="ffill",
...     parallelism=4,
...     accepted_delay=10,
...     output_handler=None,
...     msg_as_table=True,
... ).submit()