Defining a CEP Engine

The CEP engine processes real-time events primarily by subscribing to heterogeneous stream tables (through subscribeTable). Event data can be written to these tables either by using the replay function or through APIs.

Note: To write events directly to the CEP engine, use the appendEvent function. This function enables direct appending of events to the event input queue.

Creating a CEP Engine

Syntax

createCEPEngine(name, monitors, dummyTable, eventSchema, [deserializeParallelism=1], [timeColumn], [eventQueueDepth=1024], [outputTable], [dispatchKey], [dispatchBucket], [useSystemTime=true], [eventTimeField="eventTime"])

Arguments

name is a string scalar indicating the name of the CEP engine. It consists of letters, digits, and underscores(_) and must start with a letter.

monitors is metacode or a tuple of metacode containing one or more constructors of Monitor class. If multiple constructors are specified, the monitor objects will be constructed in order. For the instructions on how to define a monitor, refer to Defining Monitors.

dummyTable is a table object with the same schema as the subscribed stream table.

eventSchema is a scalar or vector of class definition of event types, indicating the events (subscribed from APIs or plugins) to be processed. For the instructions on how to define an event, refer to Defining Events.

deserializeParallelism (optional) is an integer that specifies the number of workers to deserialize the subscribed stream table. The default value is 1.

timeColumn (optional) is a STRING scalar or vector indicating the time column(s) of dummyTable. If specified, it is used as the initialized event time for event streams.

eventQueueDepth (optional) is an integer that specifies the queue depth for event input and output queue. The default value is 1024.

outputTable (optional) is a table object returned by streamEventSerializer, indicating the output table for further processing. It must be specified when the emitEvent is called.

dispatchKey (optional) is a string scalar indicating the event fields.

  • If specified,the engine creates sub-engines based on the number of unique values of the event field.

  • If not specified, the engine only creates a single sub-engine with the same name as CEP engine (name).

dispatchBucket (optional) is an integer indicating the number of hash buckets. It is used to group the specified event field (dispatchKey) using hash algorithm. To specify this parameter, dispatchKey must be specified. If specified, the engine creates the sub-engines based on bucket numbers specified by dispatchBucket.

useSystemTime (optional) is a Boolean value indicating whether the calculations are performed based on the system time (in millisecond) when the event is ingested into the engine. The default value is true. If set to false, the calculations are performed based on the timeColumn.

eventTimeField (optional) is a STRING scalar or vector indicating the time column(s) of events. It only takes effect when useSystemTime = false. It is a scalar if all events use the same time column name. Otherwise, it is a vector of the same length as eventSchema, where each element represents the time column for each event. This parameter is required if event streams are ingested into the CEP engine using appendEvent.

Examples

Define the Orders and Trades classes and the MainMonitor monitor:

class Orders{
    time :: TIMESTAMP
    securityId :: STRING
    orderPrice :: DOUBLE
    qty :: INT
    def Orders(securityId_, qty_){
        securityId = securityId_
        qty = qty_
    }
}

class Trades{
    time :: TIMESTAMP
    securityId :: STRING
    tradePrice :: DOUBLE
    volume :: INT    
    def Trades(securityId_, volume_) {
        securityId = securityId_
        volume = volume_           
    }
}



class MainMonitor{
	orderQty :: INT

	def MainMonitor(){
		orderQty = 0
		
	}

	def action1(event)

	def onload() {
		addEventListener(handler=action1, eventType="Orders", times="all")
	} 
	def action1(event) { 
		orderQty += event.qty
		str1 = "Total qty of Orders" + orderQty
		writeLog(str1)
	}	
}

Create a CEP engine cep1. Ensure that the dummyTable schema matches both the subscribed stream table and replay output table’s.

// define heterogeneous table replayOutput
share(streamTable(100:0,`time`eventType`eventBody, [TIMESTAMP, STRING, BLOB ]),"replayOutput")
// create CEP engine
engine = createCEPEngine(name='cep1', monitors=<MainMonitor()>, dummyTable=replayOutput,
eventSchema=[Orders, Trades], timeColumn="time")

Subscribe to stream table replayOutput and set handler as cep1.

subscribeTable(tableName="replayOutput", actionName="replayOutput",
handler = getStreamEngine(`cep1),msgAsTable=true)

Data replayed to the replayOutput table will be automatically ingested to and processed by the CEP engine.

Note: For writing events to heterogeneous stream tables via APIs, see the Event Subscription and Sending section in the API guides.
  • Replay from memory.
    // data to be replayed
    ordersData = select securityId, orderTime, orderPrice, qty from loadText("/home/Data/orders.csv")
    tradesData = select securityId, tradeTime, tradePrice, volume from loadText("/home/Data/trades.csv")
    
    input_dict  = dict([Orders", "Trades"], [ordersData, tradesData])
    time_dict = dict(["Orders", "Trades"], [`orderTime, `tradeTime])
    
    // replay data
    replay(inputTables=input_dict, outputTables=replayOutput, timeColumn=time_dict)
  • Replay from database (Sample orders and trades).
    if(existsDatabase('dfs://eventDB')){
    	dropDatabase('dfs://eventDB')
    }
    
    create database "dfs://eventDB"
    partitioned by VALUE(2023.02.01..2023.03.10), HASH([SYMBOL, 20])
    engine='TSDB'
    
    // import orders
    orderSchema = extractTextSchema("/home/Data/orders.csv")
    update orderSchema set type=`SYMBOL where name= `securityId
    orders=loadTextEx(dbHandle=database('dfs://eventDB'), tableName='orders',
    partitionColumns=`orderTime`securityId,sortColumns=`securityId`orderTime,
    filename="/home/Data/orders.csv",schema=orderSchema)
    
    // import trades
    tradeSchema = extractTextSchema("/home/Data/trades.csv")
    update tradeSchema set type=`SYMBOL where name= `securityId
    trades=loadTextEx(dbHandle=database('dfs://eventDB'), tableName='trades',
    partitionColumns=`tradeTime`securityId,sortColumns=`securityId`tradeTime,
    filename="/home/Data/trades.csv",schema=tradeSchema)
    
    ordersData = replayDS(sqlObj=<select securityId, orderTime, orderPrice, qty from loadTable('dfs://eventDB', `orders)>, dateColumn=`orderTime, timeColumn=`orderTime)
    tradesData = replayDS(sqlObj=<select securityId, tradeTime, tradePrice, volume from loadTable('dfs://eventDB', `trades)>, dateColumn=`tradeTime, timeColumn=`tradeTime)
    
    input_dict  = dict(["Orders", "Trades"], [ordersData, tradesData])
    date_dict = dict(["Orders", "Trades"], [`orderTime, `tradeTime])
    time_dict = dict(["Orders", "Trades"], [`orderTime, `tradeTime])
    
    // replay data
    replay(inputTables=input_dict, outputTables=replayOutput, dateColumn=date_dict, timeColumn=time_dict)

Stopping a Sub-Engine

To stop event processing operations within a specific sub-engine, you can call stopSubEngine() in any of its monitor instances.

Before the sub-engine is stopped, the following actions occur:

  1. If there are spawned monitor instances, the engine will invoke the onDestroy() function defined in those spawned monitor instances first.

  2. The engine executes all onunload() functions (if defined) declared in the monitor instance.

Syntax

stopSubEngine()

Arguments

None