Routing Events
Events can be added outside the CEP engine context or routed within a CEP engine. Within a CEP engine context, the monitor instance serves as the control center for event processing, defining how events are handled and where they should be routed. The source monitor instance has the capability to route an event either to the event input queue for further matching with another monitor instance or directly to the event output queue. Additionally, events can stay at the event matcher stage and continue being matched against newly defined listeners (see Defining Event Listeners).
To Event Input Queue
DolphinDB provides two functions, routeEvent
and
sendEvent
, to route events to event input queue and achieve
communication among monitor instances.
If you need to append events from outside the CEP engine context, the
appendEvent
function can be used, which enables you to inject
events into the CEP engine's event input queue from an external context or
application.
routeEvent
A routed event goes to the front of the input queue. The engine processes all routed events before it processes the next non-routed event on the input queue.
Syntax
routeEvent(event)
Arguments
event is an event instance.
sendEvent
An event sent by sendEvent
goes to the end of the input queue.
The engine processes events in the order they are sent to the input queue.
Syntax
sendEvent(event)
Arguments
event is an event instance.
appendEvent
The appendEvent
function allows direct event appending to event
input queue, avoiding serialization and deserialization compared to writing data
through heterogeneous tables.
Syntax
appendEvent(engine, events)
Arguments
engine is the engine object returned by createCEPEngine
or streamEventSerializer
.
events is a class object of the event instance or a dictionary. If it is a dictionary, the event instances will be automatically constructed with the key-value pairs provided. The keys of the dictionary must include the event type (specified with eventSchema) and all of its event fields.
Examples
Example 1. Append Orders events
// define an event Orders
class Orders{
eventTime :: TIMESTAMP
sym :: STRING
val0 :: INT
val1 :: FLOAT
val2 :: DOUBLE
def Orders(s,v0,v1,v2){
sym = s
val0 = v0
val1 = v1
val2 = v2
eventTime = now()
}
}
// create a CEP engine
engine=createCEPEngine(name="test_CEP",monitors=<Monitor1()>,dummyTable=dummy,eventSchema=[Orders,Change],timeColumn='eventTime')
// if events is a class instance
appendEvent(`test_CEP,Orders("b"+lpad(string(i),3,"0"),i,i*1,i*10))
// if events is a dictionary
d=dict(['eventType',"sym", "val0","val1", "val2"],["Orders",'a000',5,float(3.6),double(5.3)])
appendEvent(`test_CEP,d)
Example 2. Specify event time with eventTimeField for events appended with
appendEvent
.
class Test{
key :: string
et :: TIMESTAMP
def Test(c){
key = c
et = now()
}
}
class MarketData{
market :: STRING
code :: STRING
price :: DOUBLE
qty :: INT
et :: TIMESTAMP
def MarketData(m,c,p,q){
market = m
code = c
price = p
qty = q
et = now()
}
}
class MainMonitor{
streamMinuteBar_1min :: ANY // 1-min OHLC bar
tsAggrOHLC :: ANY // time-series engine
def MainMonitor(){
colNames = ["time","symbol","open","max","min","close","volume","amount","ret","vwap"]
colTypes = [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, DOUBLE, DOUBLE, DOUBLE]
streamMinuteBar_1min = table(10000:0,colNames, colTypes)
}
def updateMarketData(event)
// monitor market data, and calculate 1-min OHLC bars with time-series engine
def onload(){
addEventListener(updateMarketData,'MarketData',,'all')
colNames=["symbol","time","price","type","volume"]
colTypes=[SYMBOL, TIMESTAMP, DOUBLE, STRING, INT]
dummy = table(10000:0,colNames,colTypes)
colNames = ["time","symbol","open","max","min","close","volume","amount","ret","vwap"]
colTypes = [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, DOUBLE, DOUBLE, DOUBLE]
output = table(10000:0,colNames, colTypes)
tsAggrOHLC = createTimeSeriesEngine(name="tsAggrOHLC", windowSize=60000, step=60000, metrics=<[first(price) as open ,max(price) as max,min(price) as min ,last(price) as close ,sum(volume) as volume ,wsum(volume, price) as amount ,(last(price)-first(price)/first(price)) as ret, (wsum(volume, price)/sum(volume)) as vwap]>, dummyTable=dummy, outputTable=streamMinuteBar_1min, timeColumn='time', useSystemTime=false, keyColumn="symbol", fill=`none)
}
def updateMarketData(event){
tsAggrOHLC.append!(table(event.code as symbol, event.et as time, event.price as price, event.market as type, event.qty as volume))
}
}
dummy = table(array(TIMESTAMP, 0) as eventTime,array(STRING, 0) as eventType, array(BLOB, 0) as blobs)
engine = createCEPEngine(name='cep1', monitors=<MainMonitor()>, dummyTable=dummy, eventSchema=[MarketData,Test],dispatchKey='key',eventTimeField='et',useSystemTime=false,timeColumn="eventTime")
data = Test("hk")
// append events to CEP engine
appendEvent(engine, data)
data = MarketData("hk","e", 1.0, 1)
// append events to CEP engine
appendEvent(engine, data)
To Event Output Queue
emitEvent
An event sent by emitEvent
asychronously goes to the end of the
output queue.
Syntax
emitEvent(event, [eventTimeField])
Arguments
event is an event instance.
eventTimeField is a string scalar indicating the time field of the event. To specify this parameter, event must contain a time field. If useSystem is set to true when creating the engine, the output events will be timestamped with the system time. If false, it will be output with the latest event time.