SubscriptionHelper#

class swordfish._swordfishcpp.SubscriptionHelper#

A helper class for managing the stream subscription, allowing further configuration and submit of the subscription.

offset(val=-1)#

Sets the position of the first message where the subscription begins.

Parameters:

val (int, optional) – The offset position. Defaults to -1.

Returns:

The instance itself.

Return type:

Self

Note

A message is a row of the stream table. The offset is relative to the first row of the stream table when it is created. If val is unspecified or -1, the subscription starts with the next new message. If val is -2, the system retrieves the persisted offset on disk and starts the subscription from there. If some rows were cleared from memory due to the cache size limit, they are still considered in determining where the subscription starts.

msg_as_table(val=False)#

Sets whether the subscribed data is ingested into handler as a table or as an AnyVector.

Parameters:

val (bool, optional) – Whether to ingest the subscribed data into the handler as a table. Defaults to False.

Returns:

The instance itself.

Return type:

Self

Note

If val is True, the subscribed data is ingested into the handler as a table, allowing it to be processed with SQL statements. The default value is False, meaning the subscribed data is ingested as an AnyVector of columns.

batch_size(val=0)#

Sets the number of unprocessed messages required to trigger the handler.

Parameters:

val (int, optional) – The batch size threshold. Defaults to 0.

Returns:

The instance itself.

Return type:

Self

Note

If val is positive, the handler does not process messages until the number of unprocessed messages reaches val. If val is unspecified or non-positive, the handler processes incoming messages as soon as they arrive.

throttle(val=1)#

Sets the maximum waiting time before the handler processes incoming messages if the batch_size condition has not been met.

Parameters:

val (float, optional) – The maximum waiting time in seconds. Defaults to 1.

Returns:

The instance itself.

Return type:

Self

Note

This value is in seconds. This parameter has no effect if batch_size is not specified. To set val to less than 1 second, the subThrottle configuration must be modified.

hash(val=-1)#

Sets the hash value determining the subscription executor.

Parameters:

val (int, optional) – The hash value for assigning an executor. Defaults to -1.

Returns:

The instance itself.

Return type:

Self

Note

This non-negative integer specifies which subscription executor will process the incoming messages. If val is unspecified, the system automatically assigns an executor. To synchronize messages from multiple subscriptions, set the same hash value for all of them to ensure they are processed by the same executor.

reconnect(val=False)#

Sets whether the subscription can be automatically resumed if interrupted.

Parameters:

val (bool, optional) – Whether to enable automatic resubscription. Defaults to False.

Returns:

The instance itself.

Return type:

Self

Note

If val is True, the subscription attempts to resume and retrieve all streaming data since the interruption. Behavior depends on the interruption type:

  • If the network is disconnected but both nodes remain running, reconnection occurs automatically when the network is restored.

  • If the publisher node crashes, the subscriber retries resubscribing after the publisher restarts:

    • If the publisher adopts data persistence mode, resubscription succeeds only after persisted data has been loaded and the publisher reaches the row of data where the subscription was interrupted.

    • If the publisher does not adopt data persistence, resubscription fails.

  • If the subscriber node crashes, automatic resubscription is not possible and subscription must be submitted again.

filter(val=None)#

Sets the filter condition(s) for the subscription.

Parameters:

val (optional) – The filter condition(s) for the subscription. Defaults to None.

Returns:

The instance itself.

Return type:

Self

Note

Must be used with the set_filter_column function. The filter can be used in the following ways:

  • Value filtering: A Vector specifying allowed values.

  • Range filtering: A Pair defining an inclusive lower bound and an exclusive upper bound.

  • Hash filtering: An AnyVector where:

    • The first element is the number of buckets.

    • The second element is either a scalar specifying the bucket index (starting from 0) or a Pair specifying an index range (inclusive lower bound, exclusive upper bound).

  • Custom function filtering: A FunctionDef or a str (indicating function name or lambda expression). The subscribed data is passed into the function as a table, and the function result is sent to the subscriber.

filter does not support Boolean types.

persist_offset(val=False)#

Sets whether to persist the offset of the last processed message.

Parameters:

val (bool, optional) – Whether to persist the last processed message offset. Defaults to False.

Returns:

The instance itself.

Return type:

Self

Note

This is useful for resubscription and can be retrieved using Topic.processed_offset.

To resubscribe from the persisted offset, set persist_offset to True and remove_offset in unsubscribe to False.

time_trigger(val=False)#

Sets whether the handler is triggered at intervals even if no new messages arrive.

Parameters:

val (bool, optional) – Whether to trigger the handler at fixed intervals. Defaults to False.

Returns:

The instance itself.

Return type:

Self

Note

If val is True, the handler triggers at the intervals specified by throttle, even when no new messages are received.

handler_need_msg_id(val=False)#

Sets whether the handler requires message IDs.

Parameters:

val (bool, optional) – Whether the handler requires message IDs. Defaults to False.

Returns:

The instance itself.

Return type:

Self

Note

If val is True, the handler must accept two parameters:

  • msg_body: The messages ingested into the streaming engine.

  • msg_id: The ID of the last ingested message.

If val is False, the handler must accept only one parameter: msg_body.

submit()#

Submits the current state of the subscription.

Returns:

The current topic or stream to which the subscription is submitted.

Return type:

Topic