PollingClient
PollingClient 在订阅数据时,会返回一个消息队列,用户可以从该消息队列中获取和处理数据。本小节将从构造函数、订阅和取消订阅对 PollingClient 进行介绍,并展示一个使用示例。
构造方法
PollingClient(int listeningPort = 0)
其中,listeningPort
表示单线程客户端的订阅端口号。
Note: 2.00.9 及之后版本的 server 发布端通过订阅端的请求连接推送数据,不再需要接收端指定端口,参数
listeningPort 填 0 或者不填即可。
订阅
MessageQueueSP subscribe(
string host,
int port,
string tableName,
string actionName,
int64_t offset = -1,
bool resub = true,
const VectorSP &filter = nullptr,
bool msgAsTable = false,
bool allowExists = false,
string userName="",
string password="",
const StreamDeserializerSP &blobDeserializer = nullptr,
const std::vector<std::string>& backupSites = std::vector<std::string>(),
int resubscribeInterval = 100,
bool subOnce = false,
int resubscribeTimeout = 0
)
参数
host
:表示发布端节点的主机名。port
:表示发布端节点的端口号。tableName
:表示发布端上共享流数据表的名称。actionName
:表示订阅任务的名称。传入值可以包含字母、数字和下划线。offset
:表示订阅任务开始后的第一条消息所在的位置。消息是流数据表中的行。如果没有指定 offset、或其为负数、或超过了流数据表的记录行数,则订阅将会从流数据表的当前行开始。offset 与流数据表创建时的第一行对应。如果某些行因为内存限制被删除,在决定订阅开始的位置时,这些行仍然考虑在内。resub
:表示订阅中断后,API 是否会自动重订阅。filter
:一个向量,表示过滤条件。流数据表过滤列在 filter 中的数据才会发布到订阅端,不在 filter 中的数据不会发布。msgAsTable
:只有设置了 batchSize 参数,该参数才会生效。若其设置为 true,则订阅的数据会转换为 Table;若其设置为 false,则订阅的数据会转换成 AnyVector。allowExists
:若该参数设置为 true,则支持对同一个订阅流表使用多个 handler 处理数据;若其设置为 False,则不支持同一个订阅流表使用多个 handler 处理数据。userName
:表示 API 所连接服务器的登录用户名。password
:表示 API 所连接服务器的登录密码。blobDeserializer
:表示订阅的异构流表对应的反序列化器。backupSites
: 字符串列表,可选,表示备用的发布端节点列表,由节点 ip 和端口号组成,例如 [“192.168.0.1:8848“, “192.168.0.2:8849“]。- 指定backupSites,表示启动主备节点切换。如果发生节点切换(例如连接断开),会在可用节点列表中不断轮询订阅,直至订阅成功。
- 若配置该参数,用户需保证主节点(由 host 和 port 参数指定)和备用节点上存在相同结构、相同数据的同名流数据表。否则,可能出现订阅的数据不符合预期。
- 若订阅的是高可用流表,重连时将从 backupSites 指定的节点列表中选择节点。
- 取消订阅需使用主节点 ip 和端口号进行取消。
resubscribeInterval
:一个非负整数,表示在断连后每次尝试进行重新订阅的最小时间间隔(单位毫秒),默认值为 100。subOnce
:布尔值,False 表示在节点切换时,不去尝试之前成功连接过的节点。默认值为 False。resubscribeTimeout
:整型,表示重订阅的超时时间(单位毫秒)。默认值为0,表示无限重试。建议配置 resub=true 时,同时配置该参数,以防止重订阅过程耗时太长而占用过多系统资源。
返回类型
该函数返回指向消息队列的指针。
取消订阅
void unsubscribe(
string host,
int port,
string tableName,
string actionName
)
Note: 参数值需要与订阅时填入参数值相同。
使用示例
DolphinDB 脚本:新建一个Stream表,然后共享该表,名为shared_stream_table
。
rt = streamTable(`XOM`GS`AAPL as id, 102.1 33.4 73.6 as x)
share rt as shared_stream_table
C++代码:
本例使用2.00.10版本的DolphinDB server,在订阅一秒后取消订阅
int main(int argc, const char **argv)
{
PollingClient client(0);
MessageQueueSP queue = client.subscribe("127.0.0.1", 8848, "shared_stream_table", "action3", 0);
ThreadSP t = new Thread(new Executor([&client](){
Util::sleep(1000);
client.unsubscribe("127.0.0.1", 8848, "shared_stream_table", "action3");
}));
t->start();
Message msg;
while (queue->poll(msg, 1000) && !msg.isNull()) {
std::cout << msg->getString() << std::endl;
}
}