ThreadPooledClient
Note:
该类已不推荐使用,原因如下:
-
如果回调处理时间大于数据间隔,请检查回调函数是否有耗时操作(例如写文件、网络传输等)。
-
如果无法及时处理数据,建议优先尝试下列方案:
-
建议将您的数据处理过程调整到回调函数之外进行,例如您可以在回调中将数据保存为自己的数据类型,然后创建多个业务线程进行处理。
-
使用 PollingClient,创建多个线程进行读取与处理。
-
ThreadPooledClient 支持用户创建指定数量的多个线程。每当发布端的流数据到达订阅端时,如果存在空闲的线程,则从空闲的线程中选择一个来调用回调函数。故当数据到达的间隔时间小于回调函数的处理时间时,ThreadPooledClient 比 ThreadedClient 有优势。本小节将从构造函数、订阅和取消订阅对 ThreadPooledClient 进行介绍,并展示一个使用示例。
构造函数
ThreadPooledClient(int listeningPort = 0, int threadCount = 3)
listeningPort
:表示多线程客户端节点的订阅端口号。threadCount
:表示线程池创建的线程数量。
Note: 2.00.9 及之后版本的 server 发布端通过订阅端的请求连接推送数据,不再需要接收端指定端口,参数
listeningPort 填 0 或者不填即可
订阅
vector<ThreadSP> ThreadPooledClient::subscribe(
string host,
int port,
const MessageHandler &handler,
string tableName,
string actionName,
int64_t offset = -1,
bool resub = true,
const VectorSP &filter = nullptr,
bool msgAsTable = false,
bool allowExists = false,
string userName = "",
string password = "",
const StreamDeserializerSP &blobDeserializer = nullptr,
const std::vector<std::string>& backupSites = std::vector<std::string>(),
int resubscribeInterval = 100,
bool subOnce = false,
int resubscribeTimeout = 0
)
参数说明
返回类型
返回一个指针向量,每个指针指向循环调用handler的线程。这些线程在此topic被取消订阅后会退出。
取消订阅
bool unsubscribe(
string host,
int port,
string tableName,
string actionName
)
参数值需要与订阅时的填入参数值相同。
取消订阅成功时返回 true,否则返回 false。
使用示例
DolphinDB 脚本:新建一个Stream表,然后共享该表,名为shared_stream_table
。
rt = streamTable(`XOM`GS`AAPL as id, 102.1 33.4 73.6 as x)
share rt as shared_stream_table
C++ 代码:
本例使用2.00.10版本的DolphinDB server
#include <iostream>
#include "Streaming.h"
using namespace dolphindb;
int main(int argc, const char **argv)
{
auto handler = [](Message msg){
std::cout << msg->getString() << std::endl;
};
ThreadPooledClient client(0, 10);
auto t = client.subscribe("127.0.0.1", 8848, handler, "shared_stream_table", "action2", 0);
sleep(10);
client.unsubscribe("127.0.0.1", 8848, "shared_stream_table", "action2");
return 0;
}