Application in Streaming
Based on the DolphinDB streaming framework, CEP engine can achieve seamless integration of various streaming engines.
The following example demonstrates how the CEP engine processes stock events. A
time-series engine is used to calculate OHLC. The "streamMinuteBar_1min" variable of the
MainMonitor
represents a table, which will be populated with the
results of the time-series engine's calculations on the incoming stock events.
class MarketData{
market :: STRING
code :: STRING
price :: DOUBLE
qty :: INT
eventTime :: TIMESTAMP
def MarketData(m,c,p,q){
market = m
code = c
price = p
qty = q
eventTime = now()
}
}
class MainMonitor{
streamMinuteBar_1min :: ANY // store the OHLC results
tsAggrOHLC :: ANY // time-series engine
def MainMonitor(){
colNames = ["time","symbol","open","max","min","close","volume","amount","ret","vwap"]
colTypes = [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, DOUBLE, DOUBLE, DOUBLE]
streamMinuteBar_1min = table(10000:0,colNames, colTypes)
}
def updateMarketData(event)
// define a listener and a time-series engine
def onload(){
addEventListener(updateMarketData,'MarketData',,'all')
colNames=["symbol","time","price","type","volume"]
colTypes=[SYMBOL, TIMESTAMP, DOUBLE, STRING, INT]
dummy = table(10000:0,colNames,colTypes)
colNames = ["time","symbol","open","max","min","close","volume","amount","ret","vwap"]
colTypes = [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, DOUBLE, DOUBLE, DOUBLE]
output = table(10000:0,colNames, colTypes)
tsAggrOHLC = createTimeSeriesEngine(name="tsAggrOHLC", windowSize=60000, step=60000, metrics=<[first(price) as open ,max(price) as max,min(price) as min ,last(price) as close ,sum(volume) as volume ,wsum(volume, price) as amount ,(last(price)-first(price)/first(price)) as ret, (wsum(volume, price)/sum(volume)) as vwap]>, dummyTable=dummy, outputTable=streamMinuteBar_1min, timeColumn='time', useSystemTime=false, keyColumn="symbol", fill=`none)
}
def updateMarketData(event){
tsAggrOHLC.append!(table(event.code as symbol, event.eventTime as time, event.price as price, event.market as type, event.qty as volume))
}
}
dummy = table(array(STRING, 0) as eventType, array(BLOB, 0) as blobs)
engine = createCEPEngine(name='cep1', monitors=<MainMonitor()>, dummyTable=dummy, eventSchema=[MarketData])