事件处理

发送事件

DolphinDB 自 3.00.00 版本起支持 CEP 引擎,该引擎可以订阅一个异构流表,该表中每一行代表一个事件。每当异构流表中新增一行,CEP 引擎都会收到通知并处理该事件。

DolphinDB C++ API 能够将事件写入到上述异构流表中,从而使 CEP 引擎处理该事件。

相关头文件

在代码中包含下述头文件才能发送事件:

#include <EventHandler.h>

类:EventSchema

该类用于表示一个事件的结构。

struct EventSchema{
    std::string                 eventType_;                   //事件类型
    std::vector<std::string>    fieldNames_;                  //字段名
    std::vector<DATA_TYPE>      fieldTypes_;                  //字段数据类型
    std::vector<DATA_FORM>      fieldForms_;                  //字段数据形式
    std::vector<int>            fieldExtraParams_;            //额外参数,当字段类型为DECIMAL时,用来指定scale
};

类:EventSender

该类用于实现事件发送功能。

EventSender(const std::vector<EventSchema>& eventSchema, const std::vector<std::string>& eventTimeFields, const std::vector<std::string>& commonFields);

其构造函数的参数如下:

  • eventSchema:表示CEP引擎可以处理的所有事件结构,也就是可能会发送的所有事件结构。注意,事件结构必须与创建CEP引擎时指定的结构相同,否则无法正确处理。
  • eventTimeFields
    • 如果待写入的异构流表第一列是时间类型,就必须要指定每个事件中的时间类型字段。如果所有的事件中时间字段名字相同,eventTimeFields可以只包含一个元素,否则必须为每个事件都指定一个时间字段。
    • 如果待写入的异构流表第一列不是时间类型,则填空向量即可
  • commonFields
    • 如果待写入的异构流表存在common列,则需要按顺序指定要存在common列中的字段名
    • 如果待写入的异构流表不存在common列,则填空向量即可

连接方法

void connect(DBConnectionSP conn, const std::string& tableName);

其参数如下:

  • conn 表示已连接成功的 DBConnection 对象指针。
    Note: 该 BConnection 不能是异步模式
  • tableName 表示要写入的异构流表的名字。

发送事件方法

void sendEvent(const std::string& eventType, const std::vector<ConstantSP>& attributes);

其参数如下:

  • eventType 表示事件类型。
  • attributes 表示事件中各成员的值,值定义的顺序必须与定义 EventSchema 结构时的相同,并且类型和形式也要一一匹配。

实现代码示例

#include "EventHandler.h"
int main() {
    EventSchema scheme;
    scheme.eventType_ = "EventVectorInt";
    scheme.fieldNames_ = {"custom", "eventTime"};
    scheme.fieldTypes_ = {DT_INT, DT_TIMESTAMP};
    scheme.fieldForms_ = {DF_SCALAR, DF_SCALAR};
    scheme.fieldExtraParams_ = {0, 0};
    std::vector<EventSchema> eventSchemes{scheme};
    std::vector<std::string> eventTimeKeys{"eventTime"};
    std::vector<std::string> commonKeys{"custom"};
    auto sender = new EventSender(eventSchemes, eventTimeKeys, commonKeys);
    DBConnectionSP conn = new DBConnection;
    conn->connect("127.0.0.1", 8848, "admin", "123456");
    sender->connect(conn, "input");
    std::vector<ConstantSP> attributes;
    VectorSP v = Util::createVector(DT_INT, 0, 10);
    int ss = 1;
    v->appendInt(&ss, 1);
    v->appendInt(&ss, 1);
    attributes.push_back(v);
    attributes.push_back(Util::createTimestamp(112));
    sender->sendEvent("EventVectorInt", attributes);
    delete sender;
}

订阅事件

CEP 引擎可以关联一个异构流表做为输出表,并将事件写入到该异构流表

DolphinDB C++ API 可以订阅该异构流表,当有数据写入该表时,C++ API 可以收到通知。

相关头文件

在代码中包含下述头文件才能订阅事件:

#include "Streaming.h"

类:EventClient

该类用于实现事件订阅功能。其构造函数如下:

EventClient(const std::vector<EventSchema>& eventSchema, const std::vector<std::string>& eventTimeFields, const std::vector<std::string>& commonFields);

参数说明:

  • eventSchema:表示CEP引擎可以处理的所有事件结构,也就是可能会发送的所有事件结构。注意,事件结构必须与创建CEP引擎时指定的结构相同,否则无法正确处理。
  • eventTimeFields
    • 如果待写入的异构流表第一列是时间类型,就必须要指定每个事件中的时间类型字段。如果所有的事件中时间字段名字相同,eventTimeFields可以只包含一个元素,否则必须为每个事件都指定一个时间字段。
    • 如果待写入的异构流表第一列不是时间类型,则填空向量即可
  • commonFields
    • 如果待写入的异构流表存在common列,则需要按顺序指定要存在common列中的字段名
    • 如果待写入的异构流表不存在common列,则填空向量即可

订阅方法

ThreadSP subscribe(const string& host, int port, const EventMessageHandler &handler, const string& tableName, const string& actionName = DEFAULT_ACTION_NAME, int64_t offset = -1,
        bool resub = true, const string& userName="", const string& password="");

其参数如下:

  • host 用于指定 DolphinDB Server 的 IP 地址
  • port 用于指定 DolphinDB Server 的端口
  • handler 表示回调函数,在收到事件时会调用以通知用户
  • tableName 异构流表名
  • actionName 任务名称
  • offset 表示订阅偏移量,设置为 -1 时表示从最新的开始
  • resub 用于设置网络断开时是否重订阅
  • userName 为用户名
  • password 为用户密码

其回调函数如下:

using EventMessageHandler = std::function<void(const std::string&, std::vector<ConstantSP>&)>;

其中第一个参数表示事件名称,第二个参数是各个成员的值。

取消订阅的方法如下:

void unsubscribe(const string& host, int port, const string& tableName, const string& actionName = DEFAULT_ACTION_NAME);

其中参数如下:

  • host 用于指定 DolphinDB Server 的 IP 地址
  • port 用于指定 DolphinDB Server 的端口
  • tableName 用于指定异构流表名
  • actionName 用于指定任务名称

实现代码示例

#include "Streaming.h"
using namespace dolphindb
int main() {
    EventSchema scheme;
    scheme.eventType_ = "EventVectorBool";
    scheme.fieldNames_ = {"custom", "eventTime"};
    scheme.fieldTypes_ = {DT_INT, DT_TIMESTAMP};
    scheme.fieldForms_ = {DF_VECTOR, DF_SCALAR};
    scheme.fieldExtraParams_ = {0, 0};
    std::vector<EventSchema> eventSchemes{scheme};
    std::vector<std::string> eventTimeKeys{"eventTime"};
    std::vector<std::string> commonKeys{"custom"};
    EventMessageHandler handler = [](const std::string& eventType, std::vector<ConstantSP>& datas){
        std::cout << "in  handler!\n";
        std::cout << "eventType " << eventType << std::endl;
        for(auto& data : datas){
            std::cout << "data " << data->getString() << std::endl;
        }
    };
    EventClient client(eventSchemes, eventTimeKeys, commonKeys);
    client.subscribe("127.0.0.1", 8848, handler, "output", "ss1", 0);
    Util::sleep(10000);
    client.unsubscribe("127.0.0.1", 8848, "output", "ss1");
    return 0;
}