Subscription
This section explains how to subscribe to streaming data and the associated operations through DolphinDB Python API.
Enabling Streaming Data Subscription
Use the enableStreaming
method of the session class to enable streaming data subscription:
enableStreaming(port=0)
- port: The subscription port on the client to subscribe to the data sent from the server.
If you are connecting to DolphinDB server 2.00.9 or a later version, this parameter can be left empty. See the following table:
DolphinDB Server | Python API | Port Number |
---|---|---|
all 2.00.x versions prior to 2.00.9, and all 1.30.x versions | version corresponding to the server | required |
2.00.9 or later | version corresponding to the server | not required |
Note:
- All 2.00.x DolphinDB server versions prior to 2.00.9 and all 1.30.x versions require the publisher to re-establish a TCP connection for data transfer after the subscription request is submitted by the subscriber.
- DolphinDB server version 2.00.9 and later enable the publisher to push data through the connection built by the subscriber. Therefore, the subscriber does not need to specify a port (the default value 0 can be used). If port is specified, it will be ignored by the API.
Example
import dolphindb as ddb
s = ddb.Session()
# server versions 1.30.x and versions prior to 2.00.9
s.enableStreaming(8000)
# server version 2.00.9 and later
s.enableStreaming()
Compatibility note: Before upgrading the server or the API to version 2.00.9 or later, cancel your existing subscriptions. Re-subscribe after the upgrade is complete.
Creating Subscription
Use the subscribe
method to subscribe to DolphinDB stream tables. The syntax is as follows:
subscribe(host, port, handler, tableName, actionName=None, offset=-1, resub=False,
filter=None, msgAsTable=False, batchSize=0, throttle=1.0,
userName=None, password=None, streamDeserializer=None, backupSites=None,
resubscribeInterval=100, subOnce=False, *, resubTimeout=None)
Arguments
Arguments for Connection
- host: str, required. The IP address of the publisher node.
- port: str, required. The port number of the publisher node.
- userName: str, optional. The username used to connect to the server.
- password: str, optional. The password used to connect to the server.
- backupSites: list of strings, optional, default None. A backup node list with each specified in host:port format, e.g., ["192.168.0.1:8848", "192.168.0.2:8849"].
- If specified, failover mechanism is automatically activated. If the subscribed node (i.e., primary node) becomes unavailable due to a disconnection or failover, the API attempts reconnection repeatedly across backup nodes until the subscription is successful.
- The stream table definition, including the name, schema, and data entries, must be identical across the primary node and all configured backup nodes to prevent potential data inconsistencies during failover.
- If the subscribed table is a high-availability stream table, the connection will be established based on the node set of backupSites.
- To cancel a subscription, specify the host and port of the primary node.
Argument for Data Handling
- handler: required. A user-defined function to process the received data. For example:
def handler(msg):
print(msg)
Arguments for Subscription
- tableName: required. The name of the subscribed stream table.
- actionName: required. The name of the subscription task.
- Each subscription is uniquely identified with a subscription topic, which is a combination of the information about the node that the stream table is located (in the format of
<IPAddress>/<port>
), the stream table name, and the subscription task name (if actionName is specified) separated by "/". Subscription fails if an identical subscription topic already exists.
- Each subscription is uniquely identified with a subscription topic, which is a combination of the information about the node that the stream table is located (in the format of
- offset: int. The position of the first message where the subscription begins. A message is a row of the stream table. Offset is relative to the first row of the subscribed stream table when it was created. If some rows were cleared from memory due to cache size limit, they are still considered in determining where the subscription starts.
- If offset is unspecified or -1, the subscription starts with the next new message.
- If offset=-2, the system will get the persisted offset on disk and start subscription from there.
- resub: bool, default False. Whether to re-subscribe after a network disconnection.
- filter: array. The filtering conditions. Only the messages with the specified filtering column values will be published. The filtering column is set with server function
setStreamTableFilterColumn
. For details, see DolphinDB User Manual. - resubscribeInterval: default 100. A non-negative integer indicating how long (in milliseconds) to wait before attempting to resubscribe if the API detects a disconnection.
- Starting from DolphinDB Python API version 3.0.2.2, resubTimeout has been renamed to resubscribeInterval.
- subOnce: bool, default False. Whether to include the subscribed node in subsequent reconnection attempts following the node switch. Note that this parameter does not take effect for a single subscribed node when resub = true.
Other Arguments
- msgAsTable: bool, default False. True means the subscribed data is ingested into handler as a DataFrame. False means the subscribed data is ingested into handler as a List of nparrays. msgAsTable only takes effect when batchSize is specified. Note: If streamDeserializer is specified, msgAsTable must be set to False.
- batchSize: int. The number of unprocessed messages to trigger the handler.
- If batchSize is a positive integer, and
- msgAsTable = False: The handler does not process messages until the number of unprocessed messages reaches batchSize. The handler processes batchSize messages at a time and returns a list with each element corresponding to a message.
- msgAsTable = True: the messages will be processed by block (specified by server configuration parameter maxMsgNumPerBlock, defaults to 1024). For example, there are a total of 1524 messages from the publisher side. By default, the messages will be sent in two blocks, the first contains 1024 messages and the second contains 500. Suppose batchSize is set to 1500, when the first batch arrives, the 1024 messages will not be processed as they haven't reached the batchSize. When the second block arrives, the handler processes the 2 blocks (totaling 1524 records) all at once.
- If it is unspecified or non-positive, the handler processes incoming messages one by one as soon as they come in and returns a list with each element corresponding to a message.
- If batchSize is a positive integer, and
- throttle: float, default 1.0. The maximum waiting time (in seconds) before the handler processes the incoming messages. throttle takes no effect if batchSize is not specified.
- streamDeserializer: the deserializer for the subscribed hetereogeneous stream table.
Note:
The format and size of data passed into handler is affected by batchSize, msgAsTable and streamDeserializer. For examples of how these parameter affect streaming data subscription, see Subscription Options.
Example
Create a shared stream table in DolphinDB. Specify "sym" as the filtering column. Insert 10 records into the stream table, with 2 records for each unique "sym" value.
share streamTable(10000:0,`time`sym`price`id, [TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades
setStreamTableFilterColumn(trades, `sym)
insert into trades values(take(now(), 10), take(`000905`600001`300201`000908`600002, 10), rand(1000,10)/10.0, 1..10)
In the Python client, enable streaming data subscription with enableStreaming
(this example connects to a DolphinDB server with version prior to 2.00.9 or a 1.30.x version, so the port number is specified). Next, define the handler function. Then call subscribe
to create the subscription, and specify offset as -1, filter as np.array(["000905"])
. Call Event().wait()
to block the current thread to keep receiving data in the background.
import dolphindb as ddb
import numpy as np
s = ddb.Session()
s.enableStreaming(0) # port number must be specified for server prior to 2.00.9 or a 1.30.x version
def handler(lst):
print(lst)
s.subscribe("192.168.1.113", 8848, handler, "trades", "action", offset=-1, filter=np.array(["000905"]))
from threading import Event
Event().wait() # block the thread to keep receiving data
After executing the script, no data is printed. This is because the offset parameter passed to subscribe()
was set to -1. This caused the subscription to begin at the next record ingested into the stream table. Therefore, the data we previously appended using the insert into
statement was not published to the handler.
Now, run the same script in DolphinDB again:
insert into trades values(take(now(), 10), take(`000905`600001`300201`000908`600002, 10), rand(1000,10)/10.0, 1..10)
The Python process will print the received data:
[numpy.datetime64('2023-03-17T10:11:19.684'), '000905', 69.3, 1]
[numpy.datetime64('2023-03-17T10:11:19.684'), '000905', 96.5, 6]
Since the filter parameter in subscribe()
was specified to keep only records with sym="000905"
, the other data has been excluded.
Getting Subscription Topics
The getSubscriptionTopics
method returns all subscription topics in the current session.
s.getSubscriptionTopics()
Each subscription is uniquely identified with a subscription topic, which is a combination of the information about the node that the stream table is located (the IP address and port number separated by ":"), the stream table name, and the subscription task name (if actionName is specified) separated by "/". For example:
['192.168.1.113/8848/trades/action']
Canceling Subscription
The unsubscribe
method cancels an existing subscription:
unsubscribe(host, port, tableName, actionName=None)
The following script cancels the subscription we previously created:
s.unsubscribe("192.168.1.113", 8848, "trades", "action")