Subscription#

StreamTable.subscribe(action_name, handler, *, offset=-1, msg_as_table=False, batch_size=0, throttle=1, hash=-1, reconnect=False, filter=None, persist_offset=False, time_trigger=False, handler_need_msg_id=False)

Subscribes to a stream table on a local or remote server. We can also specify a handler to process the subscribed data.

Submit and return the subscription topic, which is a combination of the alias of the node where the stream table is located, stream table name, and the subscription task name (actionName) separated by “_”. If the subscription topic already exists, an exception is thrown.

  • If batch_size is specified, handler will be triggered if either the number of unprocessed messages reaches batch_size or the duration of time since the last time handler was triggered reaches throttle seconds.

  • If the subscribed table is overwritten, to keep the subscription we need to cancel the subscription with Topic.unsubscribe and then subscribe to the new table.

Here is how to set the socket buffer size in Linux:

  • In the Linux terminal, run the following commands:

sudo sysctl -w net.core.rmem_default=1048576
sudo sysctl -w net.core.rmem_max=1048576
sudo sysctl -w net.core.wmem_default=1048576
sudo sysctl -w net.core.wmem_max=1048576
  • Alternatively, add or modify the values of net.core.rmem_default, net.core.rmem_max, net.core.wmem_default and net.core.wmem_max to 1048576 in the /etc/sysctl.conf file, and then run sudo sysctl -p.

Parameters:
  • action_name – A string indicating subscription task name. It starts with a letter and can have letters, digits, and underscores.

  • handler

    A unary/binary function or a table, which is used to process the subscribed data.

    • If handler is a unary function, the only parameter of the function is the subscribed data, which can be a Table or an AnyVector of the subscribed table columns.

    • handler must be specified as a binary function when handler_need_msg_id = True. The parameters of the function are msg_body and msg_id. For details, see handler_need_msg_id.

    • If handler is a table, the subscribed data will be inserted into it directly. It can be a streaming engine, a shared table (including stream table, in-memory table, keyed table, indexed table), or a DFS table.

  • offset (int, optional) – The position of the first message where the subscription begins. Defaults to -1.

  • msg_as_table (bool, optional) – Indicates whether the subscribed data is ingested into handler as a Table or as an Any Vector. Defaults to False.

  • batch_size (int, optional) – The number of unprocessed messages to trigger the handler. Defaults to 0.

  • throttle (float, optional) – The maximum waiting seconds before the handler processes the incoming messages if the batch_size condition has not been reached. Defaults to 1.

  • hash (int, optional) – A hash value indicating which subscription executor will process the incoming messages for this subscription. Defaults to -1.

  • reconnect (bool, optional) – Specifies whether to automatically attempt to resume the subscription if interrupted. Defaults to False.

  • filter (optional) – The filter condition(s). Defaults to None.

  • persist_offset (bool, optional) – Indicates whether to persist the offset of the last processed message in the current subscription. Defaults to False.

  • time_trigger (bool, optional) – If set to True, handler will be triggered at the intervals specified by throttle even if no new messages arrive. Defaults to False.

  • handler_need_msg_id (bool, optional) – Determines the required parameters for the handler. If True, the handler must accept both msgBody (messages to be ingested) and msgId (ID of the last ingested message). If False, the handler only requires msgBody. Defaults to false.

Returns:

An instance of SubscriptionHelper that allows further configuration and submit of the subscription. This object enables setting up the optional parameters.

Return type:

SubscriptionHelper

Examples

>>> def my_handler(message):
...     print(f"Received message: {message}")
...
>>> subscription = table.subscribe(
...     action_name="action_name",
...     handler=my_handler,
...     offset=0,
...     batch_size=10,
...     reconnect=True,
...     persist_offset=True
... )

StreamTable

Stream tables are tables that support real-time data ingestion

SubscriptionHelper

A helper class for managing the stream subscription, allowing further configuration and submit of the subscription.

Topic

StreamTableInfo

PersistenceMetaInfo

table

Creates a StreamTable from data, schema, or empty specification.

topic

Retrieves a Topic by its associated StreamTable name and action name, or by its name.

exists_topic

Checks if a specific action exists for a given StreamTable, or if a Topic exists.

list_shared_tables

Retrieves the names of all shared StreamTables.

list_unloaded_persisted_tables

Retrieves the names of all unloaded persisted StreamTable.

exists

Checks if a StreamTable with the specified name exists.

drop

Drops the StreamTable with the specified name.