StreamTable#
- class swordfish._swordfishcpp.StreamTable#
Stream tables are tables that support real-time data ingestion
- enable_persistence(*, asyn_write: bool = True, compress: bool = True, cache_size: int | None = None, retention_minutes: int = 1440, flush_mode: Literal['async', 'sync'] = 'async', pre_cache: int | None = None) Self#
- enable_persistence(*, asyn_write: bool = True, compress: bool = True, retention_minutes: int = 1440, flush_mode: Literal['async', 'sync'] = 'async', pre_cache: int | None = None, cache_purge_time_column: str | None = None, cache_purge_interval: Duration | None = None, cache_retention_time: Duration | None = None) Self
Enables persistence for the stream table, allowing different configurations for cache purge.
Prerequisites
To enable persistence, specify the
persistenceDirconfiguration. The persistence location of the table is<PERSISTENCE_DIR>/<TABLE_NAME>. The directory contains data files (named likedata0.log,data1.log…) and an index fileindex.log. The data that has been persisted to disk will be loaded into memory after Swordfish is restarted.Persistence Modes
The parameter
asyn_writeinforms the system whether table persistence is in asynchronous mode. With asynchronous mode (default), new data are pushed to a queue and persistence threads will write the data to disk later. With synchronous mode, the table append operation keeps running until new data are persisted to the disk. In general, asynchronous mode achieves higher throughput.With asynchronous mode, table persistence is conducted by a single persistence thread, and the persistence thread may handle multiple tables. If there is only one table to be persisted, an increase in the number of persistence threads doesn’t improve performance.
Note that if asynchronous mode is enabled for data persistence or flush, data loss may occur due to server crash.
Cache Purge Settings
Stream tables keep all data in memory by default. To prevent excessive memory usage, you can clear cached data using either of the following methods:
Cache purge by size: Set
cache_sizeto specify a threshold for the number of records retained. Older records exceeding the threshold will be removed. The threshold is determined as follows:If the number of records appended in one batch does not exceed
cache_size, the threshold is 2.5 *cache_size.If the number of records appended in one batch exceeds
cache_size, the threshold is 1.2 * (appended records +cache_size).
Cache purge by time: Set
cache_purge_time_column,cache_purge_intervalandcache_retention_time. The system will clean up data based on thecache_purge_time_column. Each time when a new record arrives, the system obtains the time difference between the new record and the oldest record kept in memory. If the time difference exceedscache_purge_interval, the system will retain only the data with timestamps withincache_retention_timeof the new data.
- Parameters:
asyn_write (bool, optional) – Whether to enable asynchronous writes. Defaults to True.
compress (bool, optional) – Whether to save a table to disk in compression mode. Defaults to True.
cache_size (Optional[int], optional) – Used to determine the maximum number of records to retain in memory. Defaults to None.
retention_minutes (int, optional) – For how long (in minutes) a log file larger than 1GB will be kept after the last update. Defaults to 1440.
flush_mode ({'async', 'sync'}, optional) – Whether to enable synchronous disk flush. Defaults to “async”.
pre_cache (Optional[int], optional) – The number of records to be loaded into memory from the persisted stream table on disk when Swordfish is initialized. Defaults to None.
cache_purge_time_column (Optional[str], optional) – The time column in the stream table. Defaults to None.
cache_purge_interval (Optional[Duration], optional) – The interval to trigger cache purge. Defaults to None.
cache_retention_time (Optional[Duration], optional) – The retention time of cached data. Must be smaller than
cache_purge_interval. Defaults to None.
- Returns:
The StreamTable with persistence enabled.
- Return type:
Self
Examples
- Enable persistence and set cache purge by size:
>>> import swordfish as sf >>> table = sf.streaming.table(names=["id", "name", "age", "created_at"], ... types=["INT", "STRING", "INT", "TIMESTAMP"], size=0, capacity=10) >>> table.share("my_table") >>> table.enable_persistence( ... asyn_write=True, ... compress=False, ... cache_size=1024, ... retention_minutes=720, ... flush_mode="sync", ... pre_cache=100, ... )
- Enable persistence and set cache purge by time:
>>> table.enable_persistence( ... asyn_write=True, ... compress=False, ... retention_minutes=720, ... flush_mode="sync", ... pre_cache=100, ... cache_purge_time_column="created_at", ... cache_purge_interval=sf.data.Duration("2H"), ... cache_retention_time=sf.data.Duration("10m"), ... )
- enable_cache_purge(*, cache_size: int | None = None) Self#
- enable_cache_purge(*, cache_purge_time_column: str | None = None, cache_purge_interval: Duration | None = None, cache_retention_time: Duration | None = None) Self
Enables cache purge for a non-persisted stream table.
To prevent excessive memory usage, you can clear cached data using either of the following methods:
Cache purge by size: Set
cache_sizeto specify a threshold for the number of records retained. Older records exceeding the threshold will be removed. The threshold is determined as follows:If the number of records appended in one batch does not exceed
cache_size, the threshold is 2.5 *cache_size.If the number of records appended in one batch exceeds
cache_size, the threshold is 1.2 * (appended records +cache_size).
Cache purge by time: Set
cache_purge_time_column,cache_purge_intervalandcache_retention_time. The system will clean up data based on thecache_purge_time_column. Each time when a new record arrives, the system obtains the time difference between the new record and the oldest record kept in memory. If the time difference exceedscache_purge_interval, the system will retain only the data with timestamps withincache_retention_timeof the new data.
Note
If a record has not been enqueued for publishing, it will not be removed.
- Parameters:
cache_size (Optional[int], optional) – Used to determine the maximum number of records to retain in memory. Defaults to None.
cache_purge_time_column (Optional[str], optional) – The time column in the stream table. Defaults to None.
cache_purge_interval (Optional[Duration], optional) – The interval to trigger cache purge. Defaults to None.
cache_retention_time (Optional[Duration], optional) – The retention time of cached data. Must be smaller than
cache_purge_interval. Defaults to None.
- Returns:
The StreamTable with cache purge enabled.
- Return type:
Self
Examples
- Cache purge by size:
>>> table.enable_cache_purge(cache_size=1024)
- Cache purge by time:
>>> table.enable_cache_purge( ... cache_purge_time_column="created_at", ... cache_purge_interval=sf.data.Duration("2H"), ... cache_retention_time=sf.data.Duration("10m") ... )
- disable_persistence()#
Disable the table’s persistence to disk. Any future update of the table will not be persisted to disk.
- Returns:
Return the current table instance.
- Return type:
Self
Examples
>>> table.disable_persistence()
- clear_persistence()#
Stop the table’s persistence to disk, and delete the contents of the table on disk while the table schema remains.
- Returns:
Return the current table instance.
- Return type:
Self
Examples
>>> table.clear_persistence()
- set_timestamp_column(name)#
Set the timestamp column in the table.
- Parameters:
name (str) – The name of the column to be set as the timestamp column.
- Returns:
Return the current table instance.
- Return type:
Self
Examples
>>> table.set_timestamp_column("column_name")
- set_filter_column(name)#
Set the filter column in the table.
- Parameters:
name (str) – The name of the column to be set as the filter column.
- Returns:
Return the current table instance.
- Return type:
Self
Examples
>>> table.set_filter_column("column_name")
- property is_persisted: bool#
Check whether the table has been persisted.
- Returns:
True if the table is persisted, False otherwise.
- Return type:
bool
- property is_cache_purge: bool#
Check whether cache purging is enabled for the table.
- Returns:
True if cache purging is enabled, False otherwise.
- Return type:
bool
- property persistence_meta: PersistenceMetaInfo#
Retrieve metadata information related to the table’s persistence.
- Returns:
The PersistenceMetaInfo for the table, which includes details about persistence settings.
- Return type:
- property info: StreamTableInfo#
Retrieve information about the stream table.
- Returns:
The StreamTableInfo for the table.
- Return type:
- property timestamp_column: str#
Get the name of the timestamp column used in the table.
- Returns:
The name of the timestamp column.
- Return type:
str
- property filter_column: str#
Get the name of the filter column used in the table for filtering data.
- Returns:
The name of the filter column.
- Return type:
str
- subscribe(action_name, handler, *, offset=-1, msg_as_table=False, batch_size=0, throttle=1, hash=-1, reconnect=False, filter=None, persist_offset=False, time_trigger=False, handler_need_msg_id=False)#
Subscribes to a stream table on a local or remote server. We can also specify a handler to process the subscribed data.
Submit and return the subscription topic, which is a combination of the alias of the node where the stream table is located, stream table name, and the subscription task name (
actionName) separated by “_”. If the subscription topic already exists, an exception is thrown.If
batch_sizeis specified,handlerwill be triggered if either the number of unprocessed messages reachesbatch_sizeor the duration of time since the last time handler was triggered reachesthrottleseconds.If the subscribed table is overwritten, to keep the subscription we need to cancel the subscription with
Topic.unsubscribeand then subscribe to the new table.
Here is how to set the socket buffer size in Linux:
In the Linux terminal, run the following commands:
sudo sysctl -w net.core.rmem_default=1048576 sudo sysctl -w net.core.rmem_max=1048576 sudo sysctl -w net.core.wmem_default=1048576 sudo sysctl -w net.core.wmem_max=1048576
Alternatively, add or modify the values of net.core.rmem_default, net.core.rmem_max, net.core.wmem_default and net.core.wmem_max to 1048576 in the /etc/sysctl.conf file, and then run
sudo sysctl -p.
- Parameters:
action_name – A string indicating subscription task name. It starts with a letter and can have letters, digits, and underscores.
handler –
A unary/binary function or a table, which is used to process the subscribed data.
If
handleris a unary function, the only parameter of the function is the subscribed data, which can be a Table or an AnyVector of the subscribed table columns.handlermust be specified as a binary function whenhandler_need_msg_id= True. The parameters of the function are msg_body and msg_id. For details, seehandler_need_msg_id.If
handleris a table, the subscribed data will be inserted into it directly. It can be a streaming engine, a shared table (including stream table, in-memory table, keyed table, indexed table), or a DFS table.
offset (int, optional) – The position of the first message where the subscription begins. Defaults to -1.
msg_as_table (bool, optional) – Indicates whether the subscribed data is ingested into
handleras a Table or as an Any Vector. Defaults to False.batch_size (int, optional) – The number of unprocessed messages to trigger the
handler. Defaults to 0.throttle (float, optional) – The maximum waiting seconds before the handler processes the incoming messages if the
batch_sizecondition has not been reached. Defaults to 1.hash (int, optional) – A hash value indicating which subscription executor will process the incoming messages for this subscription. Defaults to -1.
reconnect (bool, optional) – Specifies whether to automatically attempt to resume the subscription if interrupted. Defaults to False.
filter (optional) – The filter condition(s). Defaults to None.
persist_offset (bool, optional) – Indicates whether to persist the offset of the last processed message in the current subscription. Defaults to False.
time_trigger (bool, optional) – If set to True,
handlerwill be triggered at the intervals specified bythrottleeven if no new messages arrive. Defaults to False.handler_need_msg_id (bool, optional) – Determines the required parameters for the
handler. If True, thehandlermust accept both msgBody (messages to be ingested) and msgId (ID of the last ingested message). If False, thehandleronly requires msgBody. Defaults to false.
- Returns:
An instance of SubscriptionHelper that allows further configuration and submit of the subscription. This object enables setting up the optional parameters.
- Return type:
Examples
>>> def my_handler(message): ... print(f"Received message: {message}") ... >>> subscription = table.subscribe( ... action_name="action_name", ... handler=my_handler, ... offset=0, ... batch_size=10, ... reconnect=True, ... persist_offset=True ... )