流订阅模式

在 Python API 中,共推荐使用四种流订阅模式:单条订阅、批量订阅(设置 msgAsTable=False)、批量订阅(设置 msgAsTable=True)和异构流数据表订阅。下面将通过四个示例来分别介绍如何使用这四种订阅模式,以及各种订阅之间的区别。有关流订阅相关参数的介绍,请参考章节流订阅

单条订阅

使用单条订阅模式,不需要指定 batchSize,此时 msgAsTable 应为 False,throttle 参数无效。

下例中,首先通过 session.run 执行脚本来构造流数据表,然后调用 session.enableStreaming 方法启用流订阅,再定义回调函数 handler。开始订阅后,调用 session.run 执行写入脚本,API 立刻收到消息并将结果打印出来。等待 3 秒后,调用 unsubscribe 取消订阅。

import dolphindb as ddb
import numpy as np
import time

s = ddb.session()
s.connect("192.168.1.113", 8848, "admin", "123456")

s.run("""
share streamTable(10000:0,`time`sym`price`id, [TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades
""")

s.enableStreaming()

def handler(lst):
    print(lst)

s.subscribe("192.168.1.113", 8848, handler, "trades", "SingleMode", offset=-1)

s.run("insert into trades values(take(now(), 6), take(`000905`600001`300201`000908`600002, 6), rand(1000,6)/10.0, 1..6)")

time.sleep(3)

s.unsubscribe("192.168.1.113", 8848, "trades", "SingleMode")

输出结果如下所示:

[numpy.datetime64('2023-03-17T12:06:30.439'), '000905', 36.7, 1]
[numpy.datetime64('2023-03-17T12:06:30.439'), '600001', 80.7, 2]
[numpy.datetime64('2023-03-17T12:06:30.439'), '300201', 68.7, 3]
[numpy.datetime64('2023-03-17T12:06:30.439'), '000908', 52.2, 4]
[numpy.datetime64('2023-03-17T12:06:30.439'), '600002', 45.1, 5]
[numpy.datetime64('2023-03-17T12:06:30.439'), '000905', 55.1, 6]

在流订阅中,API 内部仅使用 PROTOCOL_DDB 协议进行反序列化。在单条订阅模式下,DolphinDB 发送的数据由 API 接收后,每一行数据将从 AnyVector 转换为 list。有关 AnyVector 转换的详细说明,请参考章节 PROTOCOL_DDB

批量订阅(设置 msgAsTable=False

若要使用批量订阅模式,则须指定参数 batchSizethrottle,表示当接收到的消息条数超过 batchSize,或者处理消息前的等待时间超过 throttle,则会触发一次回调,将数据传递给 handler,并且按批次来处理数据。当指定 msgAsTable=False 时,收到的一批数据将是一个列表 list,其中每一项都是单条数据,结构和单条订阅模式中的一条数据一致。

在下例中,分别指定 batchSize=2throttle=0.1,表示在 0.1 秒时间内,如果收到了 2 条数据,则立刻调用回调函数传入这 2 条数据;如果等待的 0.1 秒仅收到 1 条数据,则会在等待结束后调用回调函数传入这 1 条数据。与单条订阅模式相似,批量订阅模式下通过 PROTOCOL_DDB 协议进行数据类型转换,每条数据将从 AnyVector 转为 list。

import dolphindb as ddb
import numpy as np
import time

s = ddb.session()
s.connect("192.168.1.113", 8848, "admin", "123456")

s.run("""
share streamTable(10000:0,`time`sym`price`id, [TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades
""")

s.enableStreaming()

def handler(lsts):
    print(lsts)

s.subscribe("192.168.1.113", 8848, handler, "trades", "MultiMode1", offset=-1, batchSize=2, throttle=0.1, msgAsTable=False)

s.run("insert into trades values(take(now(), 6), take(`000905`600001`300201`000908`600002, 6), rand(1000,6)/10.0, 1..6)")

time.sleep(3)

s.unsubscribe("192.168.1.113", 8848, "trades", "MultiMode1")

输出结果如下:

[[numpy.datetime64('2023-03-17T14:46:27.358'), '000905', 21.2, 1], [numpy.datetime64('2023-03-17T14:46:27.358'), '600001', 39.8, 2]]
[[numpy.datetime64('2023-03-17T14:46:27.358'), '300201', 84.0, 3], [numpy.datetime64('2023-03-17T14:46:27.358'), '000908', 26.2, 4]]
[[numpy.datetime64('2023-03-17T14:46:27.358'), '600002', 25.1, 5], [numpy.datetime64('2023-03-17T14:46:27.358'), '000905', 42.7, 6]]

批量订阅(设置 msgAsTable=True

开启批量订阅时,如果指定 msgAsTable=True,则每一批数据将基于消息块(由 DolphinDB 中的参数 maxMsgNumPerBlock 进行配置)处理消息。当收到的记录总数大于等于 batchSize 时,handler 会对所有达到条件的消息块进行处理。

下例中,在开启批量订阅模式后,调用 session.run 执行脚本,向流数据表中写入 1500 条数据,此时 DolphinDB 中的参数 maxMsgNumPerBlock 为默认值 1024,因此 API 接收到 1024 条数据后,消息条数恰好超过 batchSize=1000,立刻调用回调函数;随后收到剩下的 476 条数据,等待 0.1 秒仍无新数据,再次调用回调函数。因此最后的输出结果为两个长度分别为 1024 和 476 的 DataFrame。

import dolphindb as ddb
import numpy as np
import time

s = ddb.session()
s.connect("192.168.1.113", 8848, "admin", "123456")

s.run("""
share streamTable(10000:0,`time`sym`price`id, [TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades
""")

s.enableStreaming()

def handler(lsts):
    print(lsts)

s.subscribe("192.168.1.113", 8848, handler, "trades", "MultiMode2", offset=-1, batchSize=1000, throttle=0.1, msgAsTable=True)

s.run("n=1500;insert into trades values(take(now(), n), take(`000905`600001`300201`000908`600002, n), rand(1000,n)/10.0, 1..n)")

time.sleep(3)

s.unsubscribe("192.168.1.113", 8848, "trades", "MultiMode2")

如果修改上述示例中的 batchSize 为 1500,发送的数据为 3000 条,服务端发送第一个消息块(长度为 1024)后,不触发回调函数;服务端发送第二个消息块(长度为 1024)后,API 收到的数据条数共为 2048,超过 batchSize=1500,立刻触发回调函数,通过 PROTOCOL_DDB 协议将收到的消息从 Table 转换为 pandas.DataFrame;服务端发送第三个消息块(长度为 952)后,经过 0.1 秒,仍没有接收到新数据,此时触发回调函数。在这种情况下,回调函数中收到的数据,长度分别为 2048 和 952。

异构流数据表订阅

DolphinDB 自 1.30.17 及 2.00.5 版本开始,支持通过 replay 函数将多个结构不同的流数据表回放(序列化)到一个流数据表里,这个流数据表被称为异构流数据表。Python API 自 1.30.19 版本开始新增 streamDeserializer 类,用于构造异构流数据表反序列化器,以实现对异构流数据表的订阅和反序列化操作。

异构流数据表反序列化器

Python API 通过 streamDeserializer 类来构造异构流数据表反序列化器,接口定义如下:

streamDeserializer(sym2table, session=None)
  • sym2table:字典对象,其结构与 replay 回放到异构流数据表的输入表结构保持一致。streamDeserializer 将根据 sym2table 指定的结构对注入的数据进行反序列化。
  • session:已连接 DolphinDB 的 session 对象,默认为 None。如果不指定,将会在订阅时自动获取当前连接。

下例构造一个简单的异构流数据表反序列化器:

sd = ddb.streamDeserializer({
    'msg1': ["dfs://test_StreamDeserializer_pair", "pt1"],
    'msg2': "pt2",
}, session=s)

其中,sym2table 的键为不同输入表的标记,用于区分不同输入表的数据;sym2table 的值为表名,或由分区数据库地址和表名组成的列表(或元组)。订阅时,会通过构造时传入的 session 调用 schema 方法获得 sym2table 键值对应的表的结构,因此并不一定需要填输入表名,只需要和输入表结构一致即可。

关于构造 DolphinDB 异构流数据表的具体脚本,请参照异构回放示例

注意

  1. 在 DolphinDB 中构造异构流数据表时,字典中 key 对应的表应为内存表或 replayDS 定义的数据源,请参考 replay
  2. API 端构造异构流数据表反序列化器时,sym2table 的值对应的表(可以为分区表、流数据表或者内存表)结构需要和 DolphinDB 中构造异构流数据表使用的表结构一致。
  3. 订阅异构流数据表时,msgAsTable 不能为 True,可以指定 batchSizethrottle

订阅示例 1 (分区表数据源作为输入表)

下例中,首先在 DolphinDB 中定义由两个分区表组合而成的异构流数据表,然后在 Python 客户端定义异构流数据表反序列化器 sd,再根据 sd 中指定表的结构反序列化数据。在输出结果中,每条数据的末尾都增加了一个字段,用于标识当前数据的 symbol。

构造异构流数据表

首先在 DolphinDB 中定义输出表,即要订阅的异构流数据表。

try{dropStreamTable(`outTables)}catch(ex){}
share streamTable(100:0, `timestampv`sym`blob`price1,[TIMESTAMP,SYMBOL,BLOB,DOUBLE]) as outTables

然后定义两张输入表,均为分布式分区表。

n = 6;
dbName = 'dfs://test_StreamDeserializer_pair'
if(existsDatabase(dbName)){
    dropDB(dbName)}
db = database(dbName,RANGE,2012.01.01 2013.01.01 2014.01.01 2015.01.01 2016.01.01 2017.01.01 2018.01.01 2019.01.01)
table1 = table(100:0, `datetimev`timestampv`sym`price1`price2, [DATETIME, TIMESTAMP, SYMBOL, DOUBLE, DOUBLE])
table2 = table(100:0, `datetimev`timestampv`sym`price1, [DATETIME, TIMESTAMP, SYMBOL, DOUBLE])
tableInsert(table1, 2012.01.01T01:21:23 + 1..n, 2018.12.01T01:21:23.000 + 1..n, take(`a`b`c,n), rand(100,n)+rand(1.0, n), rand(100,n)+rand(1.0, n))
tableInsert(table2, 2012.01.01T01:21:23 + 1..n, 2018.12.01T01:21:23.000 + 1..n, take(`a`b`c,n), rand(100,n)+rand(1.0, n))
pt1 = db.createPartitionedTable(table1,'pt1',`datetimev).append!(table1)
pt2 = db.createPartitionedTable(table2,'pt2',`datetimev).append!(table2)

将分区表转为数据源后进行回放。

re1 = replayDS(sqlObj=<select * from pt1>, dateColumn=`datetimev, timeColumn=`timestampv)
re2 = replayDS(sqlObj=<select * from pt2>, dateColumn=`datetimev, timeColumn=`timestampv)
d = dict(['msg1', 'msg2'], [re1, re2])
replay(inputTables=d, outputTables=`outTables, dateColumn=`timestampv, timeColumn=`timestampv)

订阅异构流表

import dolphindb as ddb

# 异构流表反序列化器返回的数据末尾为异构流表反序列化器中 sym2table 指定的 key
def streamDeserializer_handler(lst):
    if lst[-1]=="msg1":
        print("Msg1: ", lst)
    elif lst[-1]=='msg2':
        print("Msg2: ", lst)
    else:
        print("Error: ", lst)

s = ddb.session()
s.connect("192.168.1.113", 8848, "admin", "123456")
s.enableStreaming()

# 填入分区表数据库路径和表名的 list,以获取对应表结构
sd = ddb.streamDeserializer({
    'msg1': ["dfs://test_StreamDeserializer_pair", "pt1"],
    'msg2': ["dfs://test_StreamDeserializer_pair", "pt2"],
}, session=s)
s.subscribe(host="192.168.1.113", port=8848, handler=streamDeserializer_handler, tableName="outTables", actionName="action", offset=0, resub=False,
            msgAsTable=False, streamDeserializer=sd, userName="admin", password="123456")

from threading import Event
Event().wait()

输出结果如下所示:

Msg2:  [numpy.datetime64('2012-01-01T01:21:24'), numpy.datetime64('2018-12-01T01:21:23.001'), 'a', 18.43745171907358, 'msg2']
Msg1:  [numpy.datetime64('2012-01-01T01:21:24'), numpy.datetime64('2018-12-01T01:21:23.001'), 'a', 65.69160503265448, 41.17562178615481, 'msg1']
Msg2:  [numpy.datetime64('2012-01-01T01:21:25'), numpy.datetime64('2018-12-01T01:21:23.002'), 'b', 93.68146854126826, 'msg2']
Msg1:  [numpy.datetime64('2012-01-01T01:21:25'), numpy.datetime64('2018-12-01T01:21:23.002'), 'b', 22.181119214976206, 38.162505637388676, 'msg1']
Msg2:  [numpy.datetime64('2012-01-01T01:21:26'), numpy.datetime64('2018-12-01T01:21:23.003'), 'c', 51.19852650281973, 'msg2']
Msg1:  [numpy.datetime64('2012-01-01T01:21:26'), numpy.datetime64('2018-12-01T01:21:23.003'), 'c', 16.937458558939397, 36.79589221812785, 'msg1']
Msg2:  [numpy.datetime64('2012-01-01T01:21:27'), numpy.datetime64('2018-12-01T01:21:23.004'), 'a', 0.812068443512544, 'msg2']
Msg1:  [numpy.datetime64('2012-01-01T01:21:27'), numpy.datetime64('2018-12-01T01:21:23.004'), 'a', 34.11729482654482, 29.094212289899588, 'msg1']
Msg2:  [numpy.datetime64('2012-01-01T01:21:28'), numpy.datetime64('2018-12-01T01:21:23.005'), 'b', 93.43341179518029, 'msg2']
Msg1:  [numpy.datetime64('2012-01-01T01:21:28'), numpy.datetime64('2018-12-01T01:21:23.005'), 'b', 9.413380537647754, 32.449754945002496, 'msg1']
Msg2:  [numpy.datetime64('2012-01-01T01:21:29'), numpy.datetime64('2018-12-01T01:21:23.006'), 'c', 65.18307867064141, 'msg2']
Msg1:  [numpy.datetime64('2012-01-01T01:21:29'), numpy.datetime64('2018-12-01T01:21:23.006'), 'c', 83.58133838768117, 54.27990723075345, 'msg1']

订阅示例 2 (内存表作为输入表)

下例中,在 DolphinDB 中定义了一个由两个内存表构成的异构流数据表,并在 Python 端使用共享内存表的表名构造反序列化器,最后指定 batchSize=4 进行批量订阅。可以看出,在总数据条数为 6*2=12 的情况下,数据首先按总条数分 3 批传入回调函数,在每批数据中,每条数据可能来自不同的输入表。因此,共调用回调函数 3 次,每次输出4条数据构成的一批数据。

构造异构流表

try{dropStreamTable(`outTables)}catch(ex){}
// 构造输出流数据表
share streamTable(100:0, `timestampv`sym`blob`price1,[TIMESTAMP,SYMBOL,BLOB,DOUBLE]) as outTables

n = 6;
table1 = table(100:0, `datetimev`timestampv`sym`price1`price2, [DATETIME, TIMESTAMP, SYMBOL, DOUBLE, DOUBLE])
table2 = table(100:0, `datetimev`timestampv`sym`price1, [DATETIME, TIMESTAMP, SYMBOL, DOUBLE])
tableInsert(table1, 2012.01.01T01:21:23 + 1..n, 2018.12.01T01:21:23.000 + 1..n, take(`a`b`c,n), rand(100,n)+rand(1.0, n), rand(100,n)+rand(1.0, n))
tableInsert(table2, 2012.01.01T01:21:23 + 1..n, 2018.12.01T01:21:23.000 + 1..n, take(`a`b`c,n), rand(100,n)+rand(1.0, n))
share table1 as pt1
share table2 as pt2

d = dict(['msg1', 'msg2'], [pt1, pt2])
replay(inputTables=d, outputTables=`outTables, dateColumn=`timestampv, timeColumn=`timestampv)

订阅异构流表

import dolphindb as ddb

def streamDeserializer_handler(lsts):
    print(lsts)

s = ddb.session()
s.connect("192.168.1.113", 8848, "admin", "123456")
s.enableStreaming()

sd = ddb.streamDeserializer({
    'msg1': "pt1",
    'msg2': "pt2",
}, session=s)
s.subscribe(host="192.168.1.113", port=8848, handler=streamDeserializer_handler, tableName="outTables", actionName="action", offset=0, resub=False, batchSize=4,
            msgAsTable=False, streamDeserializer=sd, userName="admin", password="123456")

from threading import Event
Event().wait()

输出结果如下所示:

[[numpy.datetime64('2012-01-01T01:21:24'), numpy.datetime64('2018-12-01T01:21:23.001'), 'a', 87.90784921264276, 'msg2'], [numpy.datetime64('2012-01-01T01:21:24'), numpy.datetime64('2018-12-01T01:21:23.001'), 'a', 14.867915444076061, 92.22166634746827, 'msg1'], [numpy.datetime64('2012-01-01T01:21:25'), numpy.datetime64('2018-12-01T01:21:23.002'), 'b', 80.60459423460998, 'msg2'], [numpy.datetime64('2012-01-01T01:21:25'), numpy.datetime64('2018-12-01T01:21:23.002'), 'b', 10.429520844481885, 29.480175042990595, 'msg1']]
[[numpy.datetime64('2012-01-01T01:21:26'), numpy.datetime64('2018-12-01T01:21:23.003'), 'c', 12.45058359648101, 'msg2'], [numpy.datetime64('2012-01-01T01:21:26'), numpy.datetime64('2018-12-01T01:21:23.003'), 'c', 55.05597074679099, 88.84371786634438, 'msg1'], [numpy.datetime64('2012-01-01T01:21:27'), numpy.datetime64('2018-12-01T01:21:23.004'), 'a', 27.357952459948137, 'msg2'], [numpy.datetime64('2012-01-01T01:21:27'), numpy.datetime64('2018-12-01T01:21:23.004'), 'a', 57.705578718334436, 25.98224212951027, 'msg1']]
[[numpy.datetime64('2012-01-01T01:21:28'), numpy.datetime64('2018-12-01T01:21:23.005'), 'b', 63.73548944480717, 'msg2'], [numpy.datetime64('2012-01-01T01:21:28'), numpy.datetime64('2018-12-01T01:21:23.005'), 'b', 65.34572763741016, 0.6374575316440314, 'msg1'], [numpy.datetime64('2012-01-01T01:21:29'), numpy.datetime64('2018-12-01T01:21:23.006'), 'c', 89.62549424753524, 'msg2'], [numpy.datetime64('2012-01-01T01:21:29'), numpy.datetime64('2018-12-01T01:21:23.006'), 'c', 98.75018240674399, 46.55078419903293, 'msg1']]