事件处理
发送事件
DolphinDB 自 3.00.00 版本起支持 CEP 引擎,该引擎可以订阅一个异构流表,该表中每一行代表一个事件。每当异构流表中新增一行,CEP 引擎都会收到通知并处理该事件。
DolphinDB C++ API 能够将事件写入到上述异构流表中,从而使 CEP 引擎处理该事件。
相关头文件
在代码中包含下述头文件才能发送事件:
#include <EventHandler.h>
类:EventSchema
该类用于表示一个事件的结构。
struct EventSchema{
std::string eventType_; //事件类型
std::vector<std::string> fieldNames_; //字段名
std::vector<DATA_TYPE> fieldTypes_; //字段数据类型
std::vector<DATA_FORM> fieldForms_; //字段数据形式
std::vector<int> fieldExtraParams_; //额外参数,当字段类型为DECIMAL时,用来指定scale
};
类:EventSender
该类用于实现事件发送功能。
EventSender(const std::vector<EventSchema>& eventSchema, const std::vector<std::string>& eventTimeFields, const std::vector<std::string>& commonFields);
其构造函数的参数如下:
- eventSchema:表示CEP引擎可以处理的所有事件结构,也就是可能会发送的所有事件结构。注意,事件结构必须与创建CEP引擎时指定的结构相同,否则无法正确处理。
- eventTimeFields:
- 如果待写入的异构流表第一列是时间类型,就必须要指定每个事件中的时间类型字段。如果所有的事件中时间字段名字相同,eventTimeFields可以只包含一个元素,否则必须为每个事件都指定一个时间字段。
- 如果待写入的异构流表第一列不是时间类型,则填空向量即可
- commonFields:
- 如果待写入的异构流表存在common列,则需要按顺序指定要存在common列中的字段名
- 如果待写入的异构流表不存在common列,则填空向量即可
连接方法
void connect(DBConnectionSP conn, const std::string& tableName);
其参数如下:
- conn 表示已连接成功的 DBConnection 对象指针。 Note: 该 BConnection 不能是异步模式
- tableName 表示要写入的异构流表的名字。
发送事件方法
void sendEvent(const std::string& eventType, const std::vector<ConstantSP>& attributes);
其参数如下:
- eventType 表示事件类型。
- attributes 表示事件中各成员的值,值定义的顺序必须与定义 EventSchema 结构时的相同,并且类型和形式也要一一匹配。
实现代码示例
#include "EventHandler.h"
int main() {
EventSchema scheme;
scheme.eventType_ = "EventVectorInt";
scheme.fieldNames_ = {"custom", "eventTime"};
scheme.fieldTypes_ = {DT_INT, DT_TIMESTAMP};
scheme.fieldForms_ = {DF_SCALAR, DF_SCALAR};
scheme.fieldExtraParams_ = {0, 0};
std::vector<EventSchema> eventSchemes{scheme};
std::vector<std::string> eventTimeKeys{"eventTime"};
std::vector<std::string> commonKeys{"custom"};
auto sender = new EventSender(eventSchemes, eventTimeKeys, commonKeys);
DBConnectionSP conn = new DBConnection;
conn->connect("127.0.0.1", 8848, "admin", "123456");
sender->connect(conn, "input");
std::vector<ConstantSP> attributes;
VectorSP v = Util::createVector(DT_INT, 0, 10);
int ss = 1;
v->appendInt(&ss, 1);
v->appendInt(&ss, 1);
attributes.push_back(v);
attributes.push_back(Util::createTimestamp(112));
sender->sendEvent("EventVectorInt", attributes);
delete sender;
}
订阅事件
CEP 引擎可以关联一个异构流表做为输出表,并将事件写入到该异构流表
DolphinDB C++ API 可以订阅该异构流表,当有数据写入该表时,C++ API 可以收到通知。
相关头文件
在代码中包含下述头文件才能订阅事件:
#include "Streaming.h"
类:EventClient
该类用于实现事件订阅功能。其构造函数如下:
EventClient(const std::vector<EventSchema>& eventSchema, const std::vector<std::string>& eventTimeFields, const std::vector<std::string>& commonFields);
参数说明:
- eventSchema:表示CEP引擎可以处理的所有事件结构,也就是可能会发送的所有事件结构。注意,事件结构必须与创建CEP引擎时指定的结构相同,否则无法正确处理。
- eventTimeFields:
- 如果待写入的异构流表第一列是时间类型,就必须要指定每个事件中的时间类型字段。如果所有的事件中时间字段名字相同,eventTimeFields可以只包含一个元素,否则必须为每个事件都指定一个时间字段。
- 如果待写入的异构流表第一列不是时间类型,则填空向量即可
- commonFields:
- 如果待写入的异构流表存在common列,则需要按顺序指定要存在common列中的字段名
- 如果待写入的异构流表不存在common列,则填空向量即可
订阅方法
ThreadSP subscribe(const string& host, int port, const EventMessageHandler &handler, const string& tableName, const string& actionName = DEFAULT_ACTION_NAME, int64_t offset = -1,
bool resub = true, const string& userName="", const string& password="");
其参数如下:
- host 用于指定 DolphinDB Server 的 IP 地址
- port 用于指定 DolphinDB Server 的端口
- handler 表示回调函数,在收到事件时会调用以通知用户
- tableName 异构流表名
- actionName 任务名称
- offset 表示订阅偏移量,设置为 -1 时表示从最新的开始
- resub 用于设置网络断开时是否重订阅
- userName 为用户名
- password 为用户密码
其回调函数如下:
using EventMessageHandler = std::function<void(const std::string&, std::vector<ConstantSP>&)>;
其中第一个参数表示事件名称,第二个参数是各个成员的值。
取消订阅的方法如下:
void unsubscribe(const string& host, int port, const string& tableName, const string& actionName = DEFAULT_ACTION_NAME);
其中参数如下:
- host 用于指定 DolphinDB Server 的 IP 地址
- port 用于指定 DolphinDB Server 的端口
- tableName 用于指定异构流表名
- actionName 用于指定任务名称
实现代码示例
#include "Streaming.h"
using namespace dolphindb
int main() {
EventSchema scheme;
scheme.eventType_ = "EventVectorBool";
scheme.fieldNames_ = {"custom", "eventTime"};
scheme.fieldTypes_ = {DT_INT, DT_TIMESTAMP};
scheme.fieldForms_ = {DF_VECTOR, DF_SCALAR};
scheme.fieldExtraParams_ = {0, 0};
std::vector<EventSchema> eventSchemes{scheme};
std::vector<std::string> eventTimeKeys{"eventTime"};
std::vector<std::string> commonKeys{"custom"};
EventMessageHandler handler = [](const std::string& eventType, std::vector<ConstantSP>& datas){
std::cout << "in handler!\n";
std::cout << "eventType " << eventType << std::endl;
for(auto& data : datas){
std::cout << "data " << data->getString() << std::endl;
}
};
EventClient client(eventSchemes, eventTimeKeys, commonKeys);
client.subscribe("127.0.0.1", 8848, handler, "output", "ss1", 0);
Util::sleep(10000);
client.unsubscribe("127.0.0.1", 8848, "output", "ss1");
return 0;
}