Routing Events

Events can be added outside the CEP engine context or routed within a CEP engine. Within a CEP engine context, the monitor instance serves as the control center for event processing, defining how events are handled and where they should be routed. The source monitor instance has the capability to route an event either to the event input queue for further matching with another monitor instance or directly to the event output queue. Additionally, events can stay at the event matcher stage and continue being matched against newly defined listeners (see Defining Event Listeners).

To Event Input Queue

DolphinDB provides two functions, routeEvent and sendEvent, to route events to event input queue and achieve communication among monitor instances.

If you need to append events from outside the CEP engine context, the appendEvent function can be used, which enables you to inject events into the CEP engine's event input queue from an external context or application.

routeEvent

A routed event goes to the front of the input queue. The engine processes all routed events before it processes the next non-routed event on the input queue.

Syntax

routeEvent(event)

Arguments

event is an event instance.

sendEvent

An event sent by sendEvent goes to the end of the input queue. The engine processes events in the order they are sent to the input queue.

Syntax

sendEvent(event)

Arguments

event is an event instance.

appendEvent

The appendEvent function allows direct event appending to event input queue, avoiding serialization and deserialization compared to writing data through heterogeneous tables.

Syntax

appendEvent(engine, events)

Arguments

engine is the engine object returned by createCEPEngine or streamEventSerializer.

events is a class object of the event instance or a dictionary. If it is a dictionary, the event instances will be automatically constructed with the key-value pairs provided. The keys of the dictionary must include the event type (specified with eventSchema) and all of its event fields.

Examples

Example 1. Append Orders events

// define an event Orders
class Orders{
    eventTime :: TIMESTAMP 
    sym :: STRING
    val0 :: INT
    val1 :: FLOAT
    val2 :: DOUBLE
    def Orders(s,v0,v1,v2){
        sym = s
        val0 = v0
        val1 = v1
        val2 = v2
        eventTime = now()
    }
}

// create a CEP engine
engine=createCEPEngine(name="test_CEP",monitors=<Monitor1()>,dummyTable=dummy,eventSchema=[Orders,Change],timeColumn='eventTime')

// if events is a class instance
appendEvent(`test_CEP,Orders("b"+lpad(string(i),3,"0"),i,i*1,i*10))
// if events is a dictionary
d=dict(['eventType',"sym", "val0","val1", "val2"],["Orders",'a000',5,float(3.6),double(5.3)])
appendEvent(`test_CEP,d)

Example 2. Specify event time with eventTimeField for events appended with appendEvent.

class Test{
    key :: string
    et :: TIMESTAMP
    def Test(c){
        key = c
        et = now()   
    }
}

class MarketData{
    market :: STRING
    code :: STRING
    price :: DOUBLE
    qty :: INT
    et :: TIMESTAMP
    def MarketData(m,c,p,q){
        market = m
        code = c
        price = p
        qty = q
        et = now()   
  }
}   

class MainMonitor{
    streamMinuteBar_1min :: ANY // 1-min OHLC bar
    tsAggrOHLC :: ANY // time-series engine
    def MainMonitor(){
        colNames = ["time","symbol","open","max","min","close","volume","amount","ret","vwap"]
        colTypes = [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, DOUBLE, DOUBLE, DOUBLE]
        streamMinuteBar_1min = table(10000:0,colNames, colTypes)
    }

    def updateMarketData(event)
    // monitor market data, and calculate 1-min OHLC bars with time-series engine
    def onload(){
        addEventListener(updateMarketData,'MarketData',,'all')
        colNames=["symbol","time","price","type","volume"]
        colTypes=[SYMBOL, TIMESTAMP, DOUBLE, STRING, INT]
        dummy = table(10000:0,colNames,colTypes) 
        colNames = ["time","symbol","open","max","min","close","volume","amount","ret","vwap"]
        colTypes = [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, DOUBLE, DOUBLE, DOUBLE]
        output = table(10000:0,colNames, colTypes)
        tsAggrOHLC = createTimeSeriesEngine(name="tsAggrOHLC", windowSize=60000, step=60000, metrics=<[first(price) as open ,max(price) as max,min(price) as min ,last(price) as close ,sum(volume) as volume ,wsum(volume, price) as amount ,(last(price)-first(price)/first(price)) as ret, (wsum(volume, price)/sum(volume)) as vwap]>, dummyTable=dummy, outputTable=streamMinuteBar_1min, timeColumn='time', useSystemTime=false, keyColumn="symbol", fill=`none)
    }

    def updateMarketData(event){
        tsAggrOHLC.append!(table(event.code as symbol, event.et as time, event.price as price, event.market as type, event.qty as volume))
    }
}
dummy = table(array(TIMESTAMP, 0) as eventTime,array(STRING, 0) as eventType,  array(BLOB, 0) as blobs)
engine = createCEPEngine(name='cep1', monitors=<MainMonitor()>, dummyTable=dummy, eventSchema=[MarketData,Test],dispatchKey='key',eventTimeField='et',useSystemTime=false,timeColumn="eventTime") 

data = Test("hk")
// append events to CEP engine
appendEvent(engine, data)

data = MarketData("hk","e", 1.0, 1)

// append events to CEP engine
appendEvent(engine, data)

To Event Output Queue

emitEvent

An event sent by emitEvent asychronously goes to the end of the output queue.

Syntax

emitEvent(event, [eventTimeField])

Arguments

event is an event instance.

eventTimeField is a string scalar indicating the time field of the event. To specify this parameter, event must contain a time field. If useSystem is set to true when creating the engine, the output events will be timestamped with the system time. If false, it will be output with the latest event time.