ThreadPooledClient
ThreadPooledClient 支持用户创建指定数量的多个线程。每当发布端的流数据到达订阅端时,如果存在空闲的线程,则从空闲的线程中选择一个来调用回调函数。故当数据到达的间隔时间小于回调函数的处理时间时,ThreadPooledClient 比 ThreadedClient 有优势。本小节将从构造函数、订阅和取消订阅对 ThreadPooledClient 进行介绍,并展示一个使用示例。
构造函数
ThreadPooledClient(int listeningPort, int threadCount)
-
listeningPort
:表示多线程客户端节点的订阅端口号。 -
threadCount
:表示线程池创建的线程数量。
注: 2.00.9 及之后版本的 server 发布端通过订阅端的请求连接推送数据,不再需要接收端指定端口,参数
listeningPort 填 0 即可
订阅
vector<ThreadSP> ThreadPooledClient::subscribe(
string host,
int port,
const MessageHandler &handler,
string tableName,
string actionName,
int64_t offset = -1,
bool resub = true,
const VectorSP &filter = nullptr,
bool msgAsTable = false,
bool allowExists = false,
string userName = "",
string password = "",
const StreamDeserializerSP &blobDeserializer = nullptr,
const std::vector<std::string>& backupSites = std::vector<std::string>(),
int resubTimeout = 100,
bool subOnce = false
)
参数说明
返回类型
返回一个指针向量,每个指针指向循环调用handler的线程。这些线程在此topic被取消订阅后会退出。
取消订阅
void unsubscribe(
string host,
int port,
string tableName,
string actionName
)
参数值需要与订阅时的填入参数值相同。
使用示例
DolphinDB 脚本:新建一个Stream表,然后共享该表,名为shared_stream_table
。
rt = streamTable(`XOM`GS`AAPL as id, 102.1 33.4 73.6 as x)
share rt as shared_stream_table
C++ 代码:
本例使用2.00.10版本的DolphinDB server
#include <iostream>
#include "Streaming.h"
using namespace dolphindb;
int main(int argc, const char **argv)
{
auto handler = [](Message msg){
std::cout << msg->getString() << std::endl;
};
ThreadPooledClient client(0, 10);
auto t = client.subscribe("127.0.0.1", 8848, handler, "shared_stream_table", "action2", 0);
sleep(10);
client.unsubscribe("127.0.0.1", 8848, "shared_stream_table", "action2");
return 0;
}