事件订阅
DolphinDB 在 3.00.0 版本中支持了复杂事件处理引擎(Complex Event Processing Engine,简称 CEP 引擎),用于实时处理和分析复杂事件数据流。CEP 引擎支持订阅异构流表,该表中的每一行数据代表着一个事件。每当异构流表中新增一行数据,CEP 引擎都会收到通知并处理该事件。为配合使用该功能,Java API 自 3.00.0.0 版本起提供 EventSchema
, EventSender
和EventClient
,分别用于构造事件、将事件写入异构流表、以及从异构流表中订阅事件。
本节将详细介绍这三类工具的使用,并给出一个完整的使用示例。
- 将事件写入异构流表,作为服务端 CEP 引擎接收的数据源。
- 订阅服务端 CEP 引擎输出的异构流表中的事件。
EventSchema 构造事件
API 提供 EventSchema
工具,用于在客户端自定义事件。
public class EventSchema {
private String eventType;
private List<String> fieldNames;
private List<Entity.DATA_TYPE> fieldTypes;
private List<Entity.DATA_FORM> fieldForms;
private List<Integer> fieldExtraParams;
}
参数说明
eventType String 类型,表示事件。
fieldNames String 类型,表示字段名。
fieldTypes 表示字段数据类型。
fieldForms 表示字段数据形式。
fieldExtraParams 非负 int 类型,额外参数,当字段类型为 DECIMAL 时,用来指定 scale(DECIMAL 类型的特有参数,用于指定小数位数)。
EventSender 发送事件
API 提供 EventSender
工具,用于向存储事件信息的异构流表中写入序列化后的事件数据。DolphinDB 服务端的 CEP 引擎通常会订阅这些流表,以便捕获并处理相应的事件。
构造函数:
public EventSender(DBConnection conn, String tableName, List<EventSchema> eventSchemas, List<String> eventTimeFields, List<String> commonFields)
参数介绍
conn 表示已连接成功的 DBConnection 对象,注意该 DBConnection 不能是异步模式。
tableName String 类型,表示要写入的异构流表的名字。
eventSchemas 表示 CEP 引擎可以处理的所有事件结构,即可能会发送的所有事件结构。注意,事件结构必须与创建 CEP 引擎时指定的结构相同,否则无法正确处理。
eventTimeFields 可选参数,String 类型标量或String 类型列表,用来指定事件的时间字段。如果待写入的异构流表第一列是时间类型,则须指定每个事件中的时间类型字段。
- 如果所有事件中的时间字段名字相同,则该参数只需指定为一个String 类型标量。
- 如果各个事件的时间字段名称不同,则该参数需要指定为一个String 类型向量。其中各元素的顺序与 eventSchema 参数的传入顺序保持一致。
commonFields 可选参数,String 类型列表,表示事件中具有相同名称的字段。可以将事件之间相同的字段单独作为列存入流表中,以便在订阅中进行过滤。
sendEvent 发送
将事件发送至 DolphinDB 服务端。
public void sendEvent(String eventType, List<Entity> attributes)
参数说明
eventType String 类型,表示事件。
attributes 表示事件中各成员的值。注意,传入值的顺序必须与定义 EventSchema
结构时指定的字段顺序相同,且其数据类型和数据形式也须与之匹配。
EventClient 订阅事件
API 提供 EventClient
工具,用于订阅异构流表中事件。该异构流表通常接收 DolphinDB 服务端 CEP 引擎处理后并序列化输出的事件数据。
其构造方法如下,其参数与 EventSender
构造函数的相同:
public EventClient(List<EventSchema> eventSchemas, List<String> eventTimeFields, List<String> commonFields)
subscribe 订阅
public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, String userName, String password)
参数说明
host String 类型,表示发布端节点的 IP 地址。
port String 类型,表示发布端节点的端口号。
tableName String 类型,表示异构流表的的名称。
actionName 可选参数,String 类型,表示订阅任务的名称,默认为 ”javaStreamingApi” 。
handler 表示用户自定义的回调函数,用于处理每次流入的消息。
offset 可选参数,int 类型,表示订阅任务开始后的第一条消息所在的位置,默认为 -1。一条消息对应流数据表中的一行。如果没有指定 offset,或它为负数,或超过了流数据表的记录行数,订阅将会从流数据表的当前行开始。offset 与流数据表创建时的第一行对应。如果某些行因为内存限制被删除,在决定订阅开始的位置时,这些行仍然考虑在内。具体可参考 DolphinDB 文档- subscribe。
reconnect 可选参数,boolean 类型,表示订阅中断后是否会自动重订阅,默认为 false,表示不订阅。
user 可选参数,String 类型,表示 Java API 所连接服务器的登录用户名,默认为 “” 表示空String 类型。
password 可选参数,String 类型,表示 Java API 所连接服务器的登录密码,默认为“” 表示空String 类型。
回调函数使用说明
使用时,第一个参数表示事件名称,第二个参数就是各个成员的值。
EventMessageHandler handler = new EventMessageHandler() {
@Override
public void doEvent(String eventType, List<Entity> attribute) {
// 处理逻辑
}
};
unsubscribe 取消订阅
public void unsubscribe(String host, int port, String tableName, String actionName)
参数说明
host String 类型,表示发布端节点的 IP 地址。
port int 类型,表示发布端节点的端口号。
tableName String 类型,表示异构流表名。
actionName String 类型,表示任务名称。
使用示例
本例将先介绍在 server 端创建事件、用于序列化事件的接口、分别用于写入和写出的异构流表、以及处理事件的 CEP 引擎;然后在 API 端使用 EventSender
向异构流表发送事件;当 CEP 引擎通过订阅接收并处理完数据后会向另一个异构流表输出消息, API 端再使用 EventClient
从该表订阅输出的事件。
Server 端创建基本要素
下述脚本将在 DolphinDB 服务端创建两个异构流表 input 和 output,两个 streamEventSerializer(序列化接口),一个 CEP 引擎,以及一个 MarketData 事件。
login("admin","123456");
class MarketData{
market :: STRING
code :: STRING
price :: DOUBLE
qty :: INT
eventTime :: TIMESTAMP
def MarketData(m,c,p,q){
market = m
code = c
price = p
qty = q
eventTime = now()
}
}
class MainMonitor{
def MainMonitor(){
}
def updateMarketData(event)
def onload(){
addEventListener(updateMarketData,'MarketData',,'all')
}
def updateMarketData(event){
emitEvent(event)
}
}
dummy = table(array(TIMESTAMP, 0) as eventTime, array(STRING, 0) as eventType, array(BLOB, 0) as blobs)
share streamTable(array(TIMESTAMP, 0) as eventTime, array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as intput
share streamTable(array(TIMESTAMP, 0) as eventTime, array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as output
schema = table(1:0, `eventType`eventFields`eventValuesTypeString`eventValueTypeID`eventValuesFormID, [STRING, STRING, STRING, INT[], INT[]])
insert into schema values("MarketData", "market,code,price,qty,eventTime", "STRING,STRING,DOUBLE,INT,TIMESTAMP", [18 18 16 4 12], [0 0 0 0 0])
inputSerializer = streamEventSerializer(name=`serInput, eventSchemes=schema, outputTable=intput, eventTimeField = "eventTime")
outputSerializer = streamEventSerializer(name=`serOutput, eventSchemes=schema, outputTable=output, eventTimeField = "eventTime")
engine = createCEPEngine('cep1', <MainMonitor()>, dummy, [MarketData], 1, 'eventTime', 10000, outputSerializer)
subscribeTable(,`intput, `subopt, 0, getStreamEngine('cep1'),true)
marketData1 = MarketData('sz', 's001', 9.8, 100)
marketData2 = MarketData('sz', 's002', 6.9, 100)
marketData3 = MarketData('sz', 's003', 4.8, 100)
marketData4 = MarketData('sz', 's004', 9.8, 100)
marketData5 = MarketData('sz', 's005', 9.8, 100)
appendEvent(inputSerializer, [marketData1,marketData2,marketData3,marketData4,marketData5])
API 端使用 EventSender
向 input 表写入数据。
// 定义 schema
EventSchema schema = new EventSchema();
schema.setEventType("MarketData");
schema.setFieldNames(Arrays.asList("market", "code", "price", "qty", "eventTime"));
schema.setFieldTypes(Arrays.asList(DT_STRING, DT_STRING, DT_DOUBLE, DT_INT, DT_TIMESTAMP));
schema.setFieldForms(Arrays.asList(DF_SCALAR, DF_SCALAR, DF_SCALAR, DF_SCALAR, DF_SCALAR));
List<EventSchema> eventSchemas = Collections.singletonList(scheme);
List<String> eventTimeFields = Collections.singletonList("eventTime");
List<String> commonFields = new ArrayList<>();
// 创建 EventSender
DBConnection conn = new DBConnection();
conn.connect(HOST, PORT, "admin", "123456");
EventSender sender = new EventSender(conn, "input", eventSchemas, eventTimeFields, commonFields);
// 准备数据:
List<Entity> attributes = new ArrayList<>();
attributes.add(new BasicString("sz"));
attributes.add(new BasicString("s001"));
attributes.add(new BasicDouble(9.8));
attributes.add(new BasicInt(100));
attributes.add(new BasicTimestamp(LocalDateTime.of(2024,3,13,15,11,10,630)));
sender.sendEvent("MarketData", attributes);
CEP 引擎会去订阅 input 流表,在收到消息后,将消息输出到 output 流表。
API 端使用 EventClient
订阅 output 表。
EventSchema scheme = new EventSchema();
scheme.setEventType("MarketData");
scheme.setFieldNames(Arrays.asList("market", "code", "price", "qty", "eventTime"));
scheme.setFieldTypes(Arrays.asList(DT_STRING, DT_STRING, DT_DOUBLE, DT_INT, DT_TIMESTAMP));
scheme.setFieldForms(Arrays.asList(DF_SCALAR, DF_SCALAR, DF_SCALAR, DF_SCALAR, DF_SCALAR));
List<EventSchema> eventSchemas = Collections.singletonList(scheme);
List<String> eventTimeFields = Collections.singletonList("eventTime");
List<String> commonFields = new ArrayList<>();
// 自定义的回调函数
EventMessageHandler handler = new EventMessageHandler() {
@Override
public void doEvent(String eventType, List<Entity> attribute) {
// 自定义处理数据
System.out.println("eventType: " + eventType);
for (Entity entity : attribute) {
System.out.println(entity.getString());
}
System.out.println("\n");
}
};
// 创建事件订阅类
EventClient client = new EventClient(eventSchemas, eventTimeFields, commonFields);
// 订阅
client.subscribe(HOST, PORT, "output", "ss1", handler, 0, true, "admin", "123456");
Thread.sleep(10000);
// 取消订阅
client.unsubscribe(HOST, PORT, "output", "ss1");