Subscription Options
There are 4 recommended subscription options in DolphinDB Python API: single-record subscription, batch subscription with msgAsTable=False
, batch subscription with msgAsTable=True
, and hetereogeneous stream table subscription. The following sections explain when to use each option. For details on the parameters of subscribe
, see Subscription.
Single-Record Subscription
With single-record subscription, leave batchSize unspecified, set msgAsTable to False, and throttle does not take effect.
In this example, we use Session.run
to create a stream table, call Session.enableStreaming
to enable subscription, and define the handler. Once the subscription has been set up, use Session.run
to insert records to the stream table. The API prints the result as soon as it receives the records. After 3 seconds, cancel the subscription by calling unsubscribe
.
import dolphindb as ddb
import numpy as np
import time
s = ddb.Session()
s.connect("192.168.1.113", 8848, "admin", "123456")
s.run("""
share streamTable(10000:0,`time`sym`price`id, [TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades
""")
s.enableStreaming()
def handler(lst):
print(lst)
s.subscribe("192.168.1.113", 8848, handler, "trades", "SingleMode", offset=-1)
s.run("insert into trades values(take(now(), 6), take(`000905`600001`300201`000908`600002, 6), rand(1000,6)/10.0, 1..6)")
time.sleep(3)
s.unsubscribe("192.168.1.113", 8848, "trades", "SingleMode")
Output:
[numpy.datetime64('2023-03-17T12:06:30.439'), '000905', 36.7, 1]
[numpy.datetime64('2023-03-17T12:06:30.439'), '600001', 80.7, 2]
[numpy.datetime64('2023-03-17T12:06:30.439'), '300201', 68.7, 3]
[numpy.datetime64('2023-03-17T12:06:30.439'), '000908', 52.2, 4]
[numpy.datetime64('2023-03-17T12:06:30.439'), '600002', 45.1, 5]
[numpy.datetime64('2023-03-17T12:06:30.439'), '000905', 55.1, 6]
In stream subscription, the DolphinDB Python API uses only PROTOCOL_DDB for deserialization. Each record is converted from DolphinDB ANY vector to a list. For details about the conversion, see PROTOCOL_DDB.
Batch Subscription with msgAsTable=False
To enable batch subscription, the batchSize and throttle parameters must be specified. When the received records exceed batchSize, or the waiting time exceeds throttle, the received records will be passed in batches to the handler function for processing. When msgAsTable is set to False, each batch of downloaded data is a list where each element is a single record, just like the results of single-record subscription.
In this example, batchSize is set to 2 and throttle is set to 0.1, indicating that if 2 records are received within 0.1 second, the records will be immediately passed to the handler function; or if only 1 record is received within 0.1 second, the record will be passed to the handler function once the 0.1 second waiting time is over. Similar to single-record subscription, PROTOCOL_DDB is used for data type conversion. Each record is converted from DolphinDB ANY vector to a list.
import dolphindb as ddb
import numpy as np
import time
s = ddb.Session()
s.connect("192.168.1.113", 8848, "admin", "123456")
s.run("""
share streamTable(10000:0,`time`sym`price`id, [TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades
""")
s.enableStreaming()
def handler(lsts):
print(lsts)
s.subscribe("192.168.1.113", 8848, handler, "trades", "MultiMode1", offset=-1, batchSize=2, throttle=0.1, msgAsTable=False)
s.run("insert into trades values(take(now(), 6), take(`000905`600001`300201`000908`600002, 6), rand(1000,6)/10.0, 1..6)")
time.sleep(3)
s.unsubscribe("192.168.1.113", 8848, "trades", "MultiMode1")
Output:
[[numpy.datetime64('2023-03-17T14:46:27.358'), '000905', 21.2, 1], [numpy.datetime64('2023-03-17T14:46:27.358'), '600001', 39.8, 2]]
[[numpy.datetime64('2023-03-17T14:46:27.358'), '300201', 84.0, 3], [numpy.datetime64('2023-03-17T14:46:27.358'), '000908', 26.2, 4]]
[[numpy.datetime64('2023-03-17T14:46:27.358'), '600002', 25.1, 5], [numpy.datetime64('2023-03-17T14:46:27.358'), '000905', 42.7, 6]]
Batch Subscription with msgAsTable=True
Setting msgAsTable to True indicates that the handler function receives data in discrete blocks (the maximum size of a block is specified by the server configuration parameter maxMsgNumPerBlock (see the DolphinDB User Manual)). When the number of received records exceeds batchSize, the handler will process them by blocks.
In this example, after enabling batch subscription, we call Session.run
to append 1,500 records to the stream table. As a block contains up to 1,024 records by default, once the API receives the first block containing 1,024 records (exceeding batchSize), the handler is triggered. The subsequent 476 records will be passed to the handler after the throttle time 0.1 second has passed. The result is 2 DataFrames with one containing 1,024 records and the other containing 476 records.
import dolphindb as ddb
import numpy as np
import time
s = ddb.Session()
s.connect("192.168.1.113", 8848, "admin", "123456")
s.run("""
share streamTable(10000:0,`time`sym`price`id, [TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades
""")
s.enableStreaming()
def handler(lsts):
print(lsts)
s.subscribe("192.168.1.113", 8848, handler, "trades", "MultiMode2", offset=-1, batchSize=1000, throttle=0.1, msgAsTable=True)
s.run("n=1500;insert into trades values(take(now(), n), take(`000905`600001`300201`000908`600002, n), rand(1000,n)/10.0, 1..n)")
time.sleep(3)
s.unsubscribe("192.168.1.113", 8848, "trades", "MultiMode2")
If we change batchSize to 1,500 and append 3,000 records to the stream table, then the first block containing 1,024 records will not trigger the handler. After the API receives the second block containing 1,024 records, the total number of received records exceeds the batchSize, and the handler is immediately triggered. The 2,048 records will be converted from DolphinDB table to a Pandas Dataframe using PROTOCOL_DDB. The third block, containing 952 records, will be passed to handler after the throttle window 0.1 second. The final output of handler is two DataFrames with 2,048 records and 952 records, respectively.
Heterogeneous Stream Table Subscription
Starting from DolphinDB server version 1.30.17/2.00.5, the replay
function supports replaying (serializing) tables with different schemata into a single "hetereogeneous" stream table.
Starting from Python API 1.30.19, the streamDeserializer class is provided to enable Python API users to subscribe to a heterogeneous stream table and deserialize the content from that table.
streamDeserializer
Construct a deserializer for hetereogeneous stream table with the streamDeserializer class.
streamDeserializer(sym2table, session=None)
- sym2table: a dictionary object indicating the input tables of
replay
(see DolphinDB User Manual - replay). The keys are the table identifiers as specified in the inputTables parameter ofreplay
and the values indicate the schema of each table. The ingested data will be parsed based on sym2table. - session: a session object connected to the DolphinDB server. The default value is None, indicating the current session.
Constructing a deserializer for hetereogeneous stream table:
sd = ddb.streamDeserializer({
'msg1': ["dfs://test_StreamDeserializer_pair", "pt1"],
'msg2': "pt2",
}, session=s)
The keys of sym2Table are the table identifiers as specified in the inputTables parameter of replay
; the values are table names (for in-memory tables), or lists/tuples consisting of the path to a DFS database and a table name (for DFS tables). The table names do not necessarily need to be the same tables as the input tables of replay, but only need to have the same schemata.
Note
(1) When replaying into a hetereogeneous stream table in DolphinDB, the inputTables parameter is a dictionary where the keys must be in-memory tables or data sources defined by the built-in function replayDS
. For more information, see DolphinDB User Manual - replay.
(2) When constructing a streamDeserializer with Python API, the dictionary values of sym2table are tables (can be DFS tables, stream tables or in-memory tables) whose schemata must be the same as the tables specified by inputTables in replay
.
(3) When subscribing to a heterogeneous stream table, msgAsTable must be False; batchSize and throttle can be specified.
(4) For each output record, the streamDeserializer appends an additional column indicating the input source (i.e., a dictionary key of sym2table) of the current record, so the output records can be further filtered (see Example 1 below).
Example 1. Subscribing to a Heterogeneous Table (with DFS Partitioned Tables as inputTables)
This example first replays two DFS partitioned tables into a heterogeneous stream table in DolphinDB. Then in the Python client, define a stream deserializer "sd" for deserializing the heterogeneous stream table.
Defining a Heterogeneous Stream Table
In DolphinDB, define the output table for the replay, i.e., the heterogeneous stream table.
try{dropStreamTable(`outTables)}catch(ex){}
share streamTable(100:0, `timestampv`sym`blob`price1,[TIMESTAMP,SYMBOL,BLOB,DOUBLE]) as outTables
Create two DFS partitioned tables as the input tables to replay:
n = 6;
dbName = 'dfs://test_StreamDeserializer_pair'
if(existsDatabase(dbName)){
dropDB(dbName)}
db = database(dbName,RANGE,2012.01.01 2013.01.01 2014.01.01 2015.01.01 2016.01.01 2017.01.01 2018.01.01 2019.01.01)
table1 = table(100:0, `datetimev`timestampv`sym`price1`price2, [DATETIME, TIMESTAMP, SYMBOL, DOUBLE, DOUBLE])
table2 = table(100:0, `datetimev`timestampv`sym`price1, [DATETIME, TIMESTAMP, SYMBOL, DOUBLE])
tableInsert(table1, 2012.01.01T01:21:23 + 1..n, 2018.12.01T01:21:23.000 + 1..n, take(`a`b`c,n), rand(100,n)+rand(1.0, n), rand(100,n)+rand(1.0, n))
tableInsert(table2, 2012.01.01T01:21:23 + 1..n, 2018.12.01T01:21:23.000 + 1..n, take(`a`b`c,n), rand(100,n)+rand(1.0, n))
pt1 = db.createPartitionedTable(table1,'pt1',`datetimev).append!(table1)
pt2 = db.createPartitionedTable(table2,'pt2',`datetimev).append!(table2)
Convert the tables to data sources with replayDS
for replay
:
re1 = replayDS(sqlObj=<select * from pt1>, dateColumn=`datetimev, timeColumn=`timestampv)
re2 = replayDS(sqlObj=<select * from pt2>, dateColumn=`datetimev, timeColumn=`timestampv)
d = dict(['msg1', 'msg2'], [re1, re2])
replay(inputTables=d, outputTables=`outTables, dateColumn=`timestampv, timeColumn=`timestampv)
Subscribing to the Heterogeneous Stream Table
import dolphindb as ddb
# the last column of each output record of the streamDeserializer is a key from sym2table
def streamDeserializer_handler(lst):
if lst[-1]=="msg1":
print("Msg1: ", lst)
elif lst[-1]=='msg2':
print("Msg2: ", lst)
else:
print("Error: ", lst)
s = ddb.Session()
s.connect("192.168.1.113", 8848, "admin", "123456")
s.enableStreaming()
# Pass in a list containing the DFS database paths and partitioned tables to indicate the schemata
sd = ddb.streamDeserializer({
'msg1': ["dfs://test_StreamDeserializer_pair", "pt1"],
'msg2': ["dfs://test_StreamDeserializer_pair", "pt2"],
}, session=s)
s.subscribe(host="192.168.1.113", port=8848, handler=streamDeserializer_handler, tableName="outTables", actionName="action", offset=0, resub=False,
msgAsTable=False, streamDeserializer=sd, userName="admin", password="123456")
from threading import Event
Event().wait()
Output:
Msg2: [numpy.datetime64('2012-01-01T01:21:24'), numpy.datetime64('2018-12-01T01:21:23.001'), 'a', 18.43745171907358, 'msg2']
Msg1: [numpy.datetime64('2012-01-01T01:21:24'), numpy.datetime64('2018-12-01T01:21:23.001'), 'a', 65.69160503265448, 41.17562178615481, 'msg1']
Msg2: [numpy.datetime64('2012-01-01T01:21:25'), numpy.datetime64('2018-12-01T01:21:23.002'), 'b', 93.68146854126826, 'msg2']
Msg1: [numpy.datetime64('2012-01-01T01:21:25'), numpy.datetime64('2018-12-01T01:21:23.002'), 'b', 22.181119214976206, 38.162505637388676, 'msg1']
Msg2: [numpy.datetime64('2012-01-01T01:21:26'), numpy.datetime64('2018-12-01T01:21:23.003'), 'c', 51.19852650281973, 'msg2']
Msg1: [numpy.datetime64('2012-01-01T01:21:26'), numpy.datetime64('2018-12-01T01:21:23.003'), 'c', 16.937458558939397, 36.79589221812785, 'msg1']
Msg2: [numpy.datetime64('2012-01-01T01:21:27'), numpy.datetime64('2018-12-01T01:21:23.004'), 'a', 0.812068443512544, 'msg2']
Msg1: [numpy.datetime64('2012-01-01T01:21:27'), numpy.datetime64('2018-12-01T01:21:23.004'), 'a', 34.11729482654482, 29.094212289899588, 'msg1']
Msg2: [numpy.datetime64('2012-01-01T01:21:28'), numpy.datetime64('2018-12-01T01:21:23.005'), 'b', 93.43341179518029, 'msg2']
Msg1: [numpy.datetime64('2012-01-01T01:21:28'), numpy.datetime64('2018-12-01T01:21:23.005'), 'b', 9.413380537647754, 32.449754945002496, 'msg1']
Msg2: [numpy.datetime64('2012-01-01T01:21:29'), numpy.datetime64('2018-12-01T01:21:23.006'), 'c', 65.18307867064141, 'msg2']
Msg1: [numpy.datetime64('2012-01-01T01:21:29'), numpy.datetime64('2018-12-01T01:21:23.006'), 'c', 83.58133838768117, 54.27990723075345, 'msg1']
Example 2. Subscribing to a Heterogeneous Table (with In-Memory Tables as inputTables)
This example replays two in-memory tables into a heterogeneous stream table in DolphinDB. In Python, construct a streamDeserializer with the names of shared tables indicating the input table schemata. Specify batchSize as 4 for batch subscription.
A total of 6*2=12 records are replayed. The records are sent to the handler in 3 batches (each batch may contain records from different input tables). The handler function is called 3 times in total, each time outputting 4 records in a batch.
Defining a Heterogeneous Stream Table
try{dropStreamTable(`outTables)}catch(ex){}
// define output table
share streamTable(100:0, `timestampv`sym`blob`price1,[TIMESTAMP,SYMBOL,BLOB,DOUBLE]) as outTables
n = 6;
table1 = table(100:0, `datetimev`timestampv`sym`price1`price2, [DATETIME, TIMESTAMP, SYMBOL, DOUBLE, DOUBLE])
table2 = table(100:0, `datetimev`timestampv`sym`price1, [DATETIME, TIMESTAMP, SYMBOL, DOUBLE])
tableInsert(table1, 2012.01.01T01:21:23 + 1..n, 2018.12.01T01:21:23.000 + 1..n, take(`a`b`c,n), rand(100,n)+rand(1.0, n), rand(100,n)+rand(1.0, n))
tableInsert(table2, 2012.01.01T01:21:23 + 1..n, 2018.12.01T01:21:23.000 + 1..n, take(`a`b`c,n), rand(100,n)+rand(1.0, n))
share table1 as pt1
share table2 as pt2
d = dict(['msg1', 'msg2'], [pt1, pt2])
replay(inputTables=d, outputTables=`outTables, dateColumn=`timestampv, timeColumn=`timestampv)
Subscribing to the Heterogeneous Stream Table
import dolphindb as ddb
def streamDeserializer_handler(lsts):
print(lsts)
s = ddb.Session()
s.connect("192.168.1.113", 8848, "admin", "123456")
s.enableStreaming()
sd = ddb.streamDeserializer({
'msg1': "pt1",
'msg2': "pt2",
}, session=s)
s.subscribe(host="192.168.1.113", port=8848, handler=streamDeserializer_handler, tableName="outTables", actionName="action", offset=0, resub=False, batchSize=4,
msgAsTable=False, streamDeserializer=sd, userName="admin", password="123456")
from threading import Event
Event().wait()
Output:
[[numpy.datetime64('2012-01-01T01:21:24'), numpy.datetime64('2018-12-01T01:21:23.001'), 'a', 87.90784921264276, 'msg2'], [numpy.datetime64('2012-01-01T01:21:24'), numpy.datetime64('2018-12-01T01:21:23.001'), 'a', 14.867915444076061, 92.22166634746827, 'msg1'], [numpy.datetime64('2012-01-01T01:21:25'), numpy.datetime64('2018-12-01T01:21:23.002'), 'b', 80.60459423460998, 'msg2'], [numpy.datetime64('2012-01-01T01:21:25'), numpy.datetime64('2018-12-01T01:21:23.002'), 'b', 10.429520844481885, 29.480175042990595, 'msg1']]
[[numpy.datetime64('2012-01-01T01:21:26'), numpy.datetime64('2018-12-01T01:21:23.003'), 'c', 12.45058359648101, 'msg2'], [numpy.datetime64('2012-01-01T01:21:26'), numpy.datetime64('2018-12-01T01:21:23.003'), 'c', 55.05597074679099, 88.84371786634438, 'msg1'], [numpy.datetime64('2012-01-01T01:21:27'), numpy.datetime64('2018-12-01T01:21:23.004'), 'a', 27.357952459948137, 'msg2'], [numpy.datetime64('2012-01-01T01:21:27'), numpy.datetime64('2018-12-01T01:21:23.004'), 'a', 57.705578718334436, 25.98224212951027, 'msg1']]
[[numpy.datetime64('2012-01-01T01:21:28'), numpy.datetime64('2018-12-01T01:21:23.005'), 'b', 63.73548944480717, 'msg2'], [numpy.datetime64('2012-01-01T01:21:28'), numpy.datetime64('2018-12-01T01:21:23.005'), 'b', 65.34572763741016, 0.6374575316440314, 'msg1'], [numpy.datetime64('2012-01-01T01:21:29'), numpy.datetime64('2018-12-01T01:21:23.006'), 'c', 89.62549424753524, 'msg2'], [numpy.datetime64('2012-01-01T01:21:29'), numpy.datetime64('2018-12-01T01:21:23.006'), 'c', 98.75018240674399, 46.55078419903293, 'msg1']]