Subscription#
- StreamTable.subscribe(action_name, handler, *, offset=-1, msg_as_table=False, batch_size=0, throttle=1, hash=-1, reconnect=False, filter=None, persist_offset=False, time_trigger=False, handler_need_msg_id=False)
Subscribes to a stream table on a local or remote server. We can also specify a handler to process the subscribed data.
Submit and return the subscription topic, which is a combination of the alias of the node where the stream table is located, stream table name, and the subscription task name (
actionName) separated by “_”. If the subscription topic already exists, an exception is thrown.If
batch_sizeis specified,handlerwill be triggered if either the number of unprocessed messages reachesbatch_sizeor the duration of time since the last time handler was triggered reachesthrottleseconds.If the subscribed table is overwritten, to keep the subscription we need to cancel the subscription with
Topic.unsubscribeand then subscribe to the new table.
Here is how to set the socket buffer size in Linux:
In the Linux terminal, run the following commands:
sudo sysctl -w net.core.rmem_default=1048576 sudo sysctl -w net.core.rmem_max=1048576 sudo sysctl -w net.core.wmem_default=1048576 sudo sysctl -w net.core.wmem_max=1048576
Alternatively, add or modify the values of net.core.rmem_default, net.core.rmem_max, net.core.wmem_default and net.core.wmem_max to 1048576 in the /etc/sysctl.conf file, and then run
sudo sysctl -p.
- Parameters:
action_name – A string indicating subscription task name. It starts with a letter and can have letters, digits, and underscores.
handler –
A unary/binary function or a table, which is used to process the subscribed data.
If
handleris a unary function, the only parameter of the function is the subscribed data, which can be a Table or an AnyVector of the subscribed table columns.handlermust be specified as a binary function whenhandler_need_msg_id= True. The parameters of the function are msg_body and msg_id. For details, seehandler_need_msg_id.If
handleris a table, the subscribed data will be inserted into it directly. It can be a streaming engine, a shared table (including stream table, in-memory table, keyed table, indexed table), or a DFS table.
offset (int, optional) – The position of the first message where the subscription begins. Defaults to -1.
msg_as_table (bool, optional) – Indicates whether the subscribed data is ingested into
handleras a Table or as an Any Vector. Defaults to False.batch_size (int, optional) – The number of unprocessed messages to trigger the
handler. Defaults to 0.throttle (float, optional) – The maximum waiting seconds before the handler processes the incoming messages if the
batch_sizecondition has not been reached. Defaults to 1.hash (int, optional) – A hash value indicating which subscription executor will process the incoming messages for this subscription. Defaults to -1.
reconnect (bool, optional) – Specifies whether to automatically attempt to resume the subscription if interrupted. Defaults to False.
filter (optional) – The filter condition(s). Defaults to None.
persist_offset (bool, optional) – Indicates whether to persist the offset of the last processed message in the current subscription. Defaults to False.
time_trigger (bool, optional) – If set to True,
handlerwill be triggered at the intervals specified bythrottleeven if no new messages arrive. Defaults to False.handler_need_msg_id (bool, optional) – Determines the required parameters for the
handler. If True, thehandlermust accept both msgBody (messages to be ingested) and msgId (ID of the last ingested message). If False, thehandleronly requires msgBody. Defaults to false.
- Returns:
An instance of SubscriptionHelper that allows further configuration and submit of the subscription. This object enables setting up the optional parameters.
- Return type:
Examples
>>> def my_handler(message): ... print(f"Received message: {message}") ... >>> subscription = table.subscribe( ... action_name="action_name", ... handler=my_handler, ... offset=0, ... batch_size=10, ... reconnect=True, ... persist_offset=True ... )
Stream tables are tables that support real-time data ingestion |
|
A helper class for managing the stream subscription, allowing further configuration and submit of the subscription. |
|
Creates a StreamTable from data, schema, or empty specification. |
|
Retrieves a Topic by its associated StreamTable name and action name, or by its name. |
|
Checks if a specific action exists for a given StreamTable, or if a Topic exists. |
|
Retrieves the names of all shared StreamTables. |
|
Retrieves the names of all unloaded persisted StreamTable. |
|
Checks if a StreamTable with the specified name exists. |
|
Drops the StreamTable with the specified name. |