CEP 引擎
创建 CEP 引擎
语法
createCEPEngine(name, monitors, dummyTable, eventSchema,
[deserializeParallelism=1], [timeColumn], [eventQueueDepth=1024], [outputTable],
[dispatchKey], [dispatchBucket], [useSystemTime=true])
参数
name 字符串标量,表示 CEP 引擎的名称。可包含字母,数字和下划线,但必须以字母开头。
monitors 元代码或元代码元组,可包含一个或多个 Monitor 类的构造函数调用。如果指定了多个 Monitor 类的构造函数调用,在创建 subEngine 时,将按照指定的顺序构造这些对象。Monitor 的创建方法参见 简单的 Monitor 实例。
dummyTable 一个表对象,和订阅的流数据表的 schema 一致。
class event1 {
var1:: STRING
var2:: INT
def event1(){
var1 = ""
var2 = 0
}
}
class event2 {
var1:: DOUBLE
var2:: INT
def event2(){
var1 = 0.0
var2 = 0
}
}
//指定为标量
eventSchema = event1
//指定为向量
eventSchema = [event1, event2]
deserializeParallelism 整型标量,用于指定反序化的线程数,默认值为1。
timeColumn 指定 dummyTable 中的时间列。指定后,这列将作为事件时间。
eventQueueDepth 整型标量,用于指定子引擎接收事件队列和输出队列的最大深度,默认值是1024。
outputTable 一个表对象,用于输出事件以供后续操作使用。如果调用了 emitEvent 接口,则需要指定该参数为
StreamEventSerializer
返回的表对象。
dispatchKey 字符串标量,用于指定事件中的属性,该属性中的每个唯一值被视为一个 key。若不指定 dispatchKey,则引擎将创建一个名为 name 的子引擎(与 CEP 引擎同名)。
dispatchBucket 整型标量,表示哈希分组的数量。如果指定该参数,引擎将对 dispatchKey 指定的字段进行哈希分组。
CEP 引擎根据 dispatchKey 和 dispatchBucket 自动创建子引擎,并行处理数据。各个子引擎之间互相独立。若未指定 dispatchKey,则 CEP 引擎只有一个处理线程,即只创建一个子引擎。
useSystemTime 布尔值,表示是否使用数据注入引擎时的系统时间进行计算。
-
当 useSystemTime = true 时(缺省值)时,CEP 引擎中的计算都基于数据注入引擎的时刻(以毫秒精度表示的本地系统时间)进行,与数据中的时间列无关。
-
当 useSystemTime = false 时,CEP 引擎中的计算都基于数据中的 timeColumn 列进行。
定义事件监听器
在 CEP 引擎中,通过 addEventListener
指定事件匹配规则和回调函数。事件监听器只能在 Monitor
内调用,其观察注入引擎的每个事件,当事件或事件模式与匹配规则相匹配时,会触发执行回调函数。
语法
addEventListener(handler, [eventType], [condition], [times="all"], [at], [wait], [within], [exceedTime])
参数
-
事件匹配:
handler 回调函数,如果指定了 eventType 则为一元函数,参数为匹配的事件。如果指定为按时间触发,则没有参数。
def actions1(stockVal){ //将事件类型 stockVal 的属性写入一个共享表 insert into objByName("sharedUSPrices") values([stockVal.price, stockVal.qty]) }
handler 中可以动态添加新的监听器:
def actions1(stockVal){ //将事件类型 stockVal 的属性写入一个共享表 insert into objByName("sharedUSPrices") values([stockVal.price, stockVal.qty]) //新增加一个事件侦听器 addEventListener(handler=action2, eventType="Stock", condition=<self.prices > stockVal.price>) }
eventType字符串标量,表示事件类型。若指定为 “any”,则表示任意事件。
condition 元代码类型,指定匹配的事件条件,即事件匹配规则。元代码的返回结果必须是布尔值。例如 eventType 为 Stock 时,指定 <Stock.price > 10 and Stock.qty < 10> 。
times 可以是正整数或者 ”all”,表示在 handler 被触发指定次数后自动删除监听,默认为 ”all”。例如,若设置为 5,表示 handler 被触发 5 次后,将删除该监听器;若设置为 ”all”,则会持续监听事件,对每个匹配的事件都触发 handler,直至引擎被删除。
-
按时间触发(此时不会进行事件匹配,因此不可指定 eventType):
at 一个长度为 6 的元组,用于指定触发 handler 的时间频率。其形式为 (seconds,minutes, hours, days_of_the_week,days_of_the_month, month ) ,其中各元素依次表示秒(必须指定)、分钟、小时、一周中的第几天、当月的第几天、月份。如果times="all”,则表示每月/日/周几/小时/分钟的第 seconds 秒触发 handler。例如:(0, 5, , , ),表示在每小时的 05 分触发 handler。如果 times 指定为具体数字,则 handler 只会被触发指定次数。
wait DURATION 类型,表示等待多长时间后触发 handler。如果 times=”all”,则表示每隔多久触发 handler。例如:wait = 60s, times =”all”,每隔 60秒触发一次 handler。如果 times 指定为具体数字,则 handler 只会被触发指定次数。
-
同时限定时间和事件:
within 仅在限定的时间内收到匹配的事件时才触发 handler。例如:eventType="tickets", within=60s ,表示60秒内匹配到事件 tickets,则触发 handler,否则删除这个监听器。
exceedTime 仅在限定的时间内没有匹配的事件时才触发 handler。例如:eventType="tickets", exceedTime=60s,表示若60秒内未匹配到事件 tickets,则触发 handler,否则删除这个监听器。
下面举例说明事件监听器的几种触发方式:
事件匹配:监听单一事件或者所有事件,并且限定事件条件 |
监听价格大于10.0 的股票。下例中 eventType 为事件,condition 为事件匹配条件,handler 为监听到符合条件的事件之后的回调函数。
监听所有的股票
监听任意事件
|
按时间触发 |
在固定时间触发,比如:在每天的 8:30 触发。
等待固定时间之后触发,比如:
|
同时限定时间和事件 |
在限定时间内匹配事件,比如在60秒内匹配到价格大于10.0的 Stock 事件,则执行回调函数。
在限定时间内未匹配事件,如在60秒内没有匹配到价格大于10.0的 Stock 事件,则执行回调函数。
|
追加事件
向 CEP 引擎注入事件有两种方法:
-
通过
appendEvent
接口,直接将事件实例写入 CEP 引擎。 -
将异构流表中的数据写入 CEP 引擎:通过
append!
/tableInsert
/insert into
直接向引擎插入一个异构流表。需要事先将事件通过replay
回放到一个异构流数据表或者通过 API 写入异构流表,然后通过subscribeTable
函数订阅该异构流表向引擎输入事件。
这两种方式的主要区别在于,appendEvent
写入事件不需要序列化和反序列化;而通过
append!
/ tableInsert
/ insert
into
写入事件需要进行序列化和反序列化,但这种方法可以将多个不同的事件类型写入到一个异构流数据表。
appendEvent
语法
appendEvent(engine, events)
参数
engine 引擎句柄。目前支持序列化引擎和 CEP 引擎。
events 事件类型,可以是事件类型实例或者字典。如果指定为字典,系统会根据键值对构造出事件实例,因此字典的键必须包含 eventType 和 eventSchema 中指定的 eventField。
示例
假如已定义的事件类型 Orders,其包含字段 sym,val0,val1,val2:
class Orders{
eventTime :: TIMESTAMP
sym :: STRING
val0 :: INT
val1 :: FLOAT
val2 :: DOUBLE
def Orders(s,v0,v1,v2){
sym = s
val0 = v0
val1 = v1
val2 = v2
eventTime = now()
}
}
创建CEP 引擎如下:
engine=createCEPEngine(name="test_CEP",monitors=<Monitor1()>,dummyTable=dummy,eventSchema=Orders,timeColumn='eventTime')
下面说明如何通过事件实例或字典向 CEP 引擎中追加事件。
-
events 指定为一个事件类型实例:
appendEvent(`test_CEP,Orders("b"+lpad(string(i),3,"0"),i,i*1,i*10))
-
events 指定为一个字典:
d=dict(['eventType',"sym", "val0","val1", "val2"],["Orders",'a000',5,float(3.6),double(5.3)]) appendEvent(`test_CEP,d)
路由事件
在 Monitor 中,事件通常有四种流向:插入事件输入队列的队尾、插入输入处理队列的队首、继续进行模式匹配、以及插入事件输出队列等待输出。其中,继续进行模式匹配是在监听器的 handler 中定义的另一个监听器。若要添加监听器,请参考上一节内容。本节将主要介绍将事件插入事件输入队列队尾、插入事件输入队列的队首以及插入输出队列的方法。
插入事件输入队列队尾
将事件插入到当前子引擎的事件处理队列的尾部。
语法
sendEvent(event)
参数
event 事件类型实例。
插入事件输入队列队首
将事件插入到当前子引擎的事件处理队列的队首。
语法
routeEvent(event)
参数
event 事件类型实例。
插入输出队列队尾
将事件插入到 CEP 引擎的事件输出队列的队尾。引擎会异步地将事件发送到输出队列。
语法
emitEvent(event, [eventTimeField])
参数
event 事件类型实例。
eventTimeField 字符串标量,表示事件中的时间字段名。若要指定此参数,event 必须包含时间字段。当 useSystemTime=true 时,输出事件中的时间为系统时间;否则,输出最新事件时间。
停止子引擎
语法
stopSubEngine()
详情
关闭执行此函数的 Monitor 实例所在的子引擎。在关闭子引擎前,将调用其内部所有 Monitor 实例中已声明的 onunload
方法。如果存在通过 spawn 产生的 Monitor 实例,将首先调用这些由 spawn 产生的 Monitor 实例中已声明的
onDestroy
方法。