createCEPEngine
语法
createCEPEngine(name, monitors, dummyTable, eventSchema,
[deserializeParallelism=1], [timeColumn], [eventQueueDepth=1024], [outputTable],
[dispatchKey], [dispatchBucket], [useSystemTime=true],
[eventTimeField="eventTime"])
详情
创建一个复杂事件处理(CEP)引擎实例,用于处理实时事件流。
参数
name 字符串标量,表示 CEP 引擎的名称。可包含字母,数字和下划线,但必须以字母开头。
monitors 元代码或元代码元组,可包含一个或多个 Monitor 类的构造函数调用。如果指定了多个 Monitor 类的构造函数调用,在创建 subEngine 时,将按照指定的顺序构造这些对象。Monitor 的创建方法参见 简单的 Monitor 实例。
dummyTable 一个表对象,和订阅的流数据表的 schema 一致。
eventSchema 事件类型类的声明,用于指定为外部输入的事件类型,可以是一个标量或向量。例如:
class event1 {
var1:: STRING
var2:: INT
def event1(){
var1 = ""
var2 = 0
}
}
class event2 {
var1:: DOUBLE
var2:: INT
def event2(){
var1 = 0.0
var2 = 0
}
}
//指定为标量
eventSchema = event1
//指定为向量
eventSchema = [event1, event2]
deserializeParallelism 整型标量,用于指定反序化的线程数,默认值为1。
timeColumn 可选参数,字符串标量或向量,用于指定 dummyTable 中的时间列。指定后,通过事件流序列化器(Stream Event Serializer)向 CEP 引擎插入事件时,该列将作为事件时间。
eventQueueDepth 整型标量,用于指定子引擎接收事件队列和输出队列的最大深度,默认值是1024。
outputTable 一个表对象,用于输出事件以供后续操作使用。如果调用了 emitEvent 接口,则需要指定该参数为
StreamEventSerializer
返回的表对象。
dispatchKey 字符串标量,用于指定事件中的属性,该属性中的每个唯一值被视为一个 key。若不指定 dispatchKey,则引擎将创建一个名为 name 的子引擎(与 CEP 引擎同名)。
dispatchBucket 整型标量,表示哈希分组的数量。如果指定该参数,引擎将对 dispatchKey 指定的字段进行哈希分组。
CEP 引擎根据 dispatchKey 和 dispatchBucket 自动创建子引擎,并行处理数据。各个子引擎之间互相独立。若未指定 dispatchKey,则 CEP 引擎只有一个处理线程,即只创建一个子引擎。
useSystemTime 布尔值,表示是否使用数据注入引擎时的系统时间进行计算。
- 当 useSystemTime = true 时(缺省值)时,CEP 引擎中的计算都基于数据注入引擎的时刻(以毫秒精度表示的本地系统时间)进行,与数据中的时间列无关。
- 当 useSystemTime = false 时,CEP 引擎中的计算都基于数据中的 timeColumn 列进行。
eventTimeField 可选参数,字符串标量或向量,用于指定事件中的时间字段,仅在 useSystemTime = false
有效。如果所有事件的时间字段名都相同,则 eventTimeField 是一个标量;否则,eventTimeField 是一个向量,其长度和
eventSchema 的长度相同,每个元素分别代表每个事件的时间字段。通过 appendEvent
直接向
CEP 引擎插入事件时,必须通过该参数指定事件的时间。
例子
定义事件 Orders,其包含字段 sym,val0,val1,val2:
class Orders{
eventTime :: TIMESTAMP
sym :: STRING
val0 :: INT
val1 :: FLOAT
val2 :: DOUBLE
def Orders(s,v0,v1,v2){
sym = s
val0 = v0
val1 = v1
val2 = v2
eventTime = now()
}
}
定义一个简单的 monitor:
class mainMonitor:CEPMonitor {
ordersTable :: ANY // 定义成员变量 ordersTable,不限制数据类型和形式
def mainMonitor(){
ordersTable = array(ANY, 0) // 这里确定 ordersTable 为一个元组
}
def updateOrders(event) {
ordersTable.append!([event.sym, event.val0, event.val1, event.val2])
}
def onload(){
addEventListener(updateOrders, 'Orders',,'all')
}
}
创建 CEP 引擎如下:
dummy = table(array(TIMESTAMP, 0) as eventTime,array(STRING, 0) as eventType, array(BLOB, 0) as blobs)
engine=createCEPEngine(name="test_CEP",monitors=<mainMonitor()>,dummyTable=dummy,eventSchema=Orders,timeColumn='eventTime')
相关函数:addEventListener, dropStreamEngine, getCEPEngineStat, stopSubEngine