ThreadPooledClient

ThreadPooledClient 支持用户创建指定数量的多个线程。每当发布端的流数据到达订阅端时,如果存在空闲的线程,则从空闲的线程中选择一个来调用回调函数。故当数据到达的间隔时间小于回调函数的处理时间时,ThreadPooledClient 比 ThreadedClient 有优势。本小节将从构造函数、订阅和取消订阅对 ThreadPooledClient 进行介绍,并展示一个使用示例。

构造函数

ThreadPooledClient(int listeningPort, int threadCount)
  • listeningPort:表示多线程客户端节点的订阅端口号。

  • threadCount:表示线程池创建的线程数量。

注: 2.00.9 及之后版本的 server 发布端通过订阅端的请求连接推送数据,不再需要接收端指定端口,参数 listeningPort 填 0 即可

订阅

vector<ThreadSP> ThreadPooledClient::subscribe(
    string host,
    int port,
    const MessageHandler &handler,
    string tableName,
    string actionName,
    int64_t offset = -1,
    bool resub = true,
    const VectorSP &filter = nullptr,
    bool msgAsTable = false,
    bool allowExists = false, 
    string userName = "",
    string password = "",
    const StreamDeserializerSP &blobDeserializer = nullptr,
    const std::vector<std::string>& backupSites = std::vector<std::string>(),
    int resubTimeout = 100,
    bool subOnce = false
)

参数说明

参考:ThreadedClient 参数

返回类型

返回一个指针向量,每个指针指向循环调用handler的线程。这些线程在此topic被取消订阅后会退出。

取消订阅

void unsubscribe(
    string host,
    int port,
    string tableName,
    string actionName
)

参数值需要与订阅时的填入参数值相同。

使用示例

DolphinDB 脚本:新建一个Stream表,然后共享该表,名为shared_stream_table

rt = streamTable(`XOM`GS`AAPL as id, 102.1 33.4 73.6 as x)
share rt as shared_stream_table

C++ 代码

本例使用2.00.10版本的DolphinDB server

#include <iostream>
#include "Streaming.h"
using namespace dolphindb;

int main(int argc, const char **argv)
{
    auto handler = [](Message msg){
        std::cout << msg->getString() << std::endl;
    };
    ThreadPooledClient client(0, 10);
    auto t = client.subscribe("127.0.0.1", 8848, handler, "shared_stream_table", "action2", 0);
    sleep(10);
    client.unsubscribe("127.0.0.1", 8848, "shared_stream_table", "action2");
    return 0;
}