createDataViewEngine
语法
createDataViewEngine(name, outputTable, keyColumns, timeColumn,
[useSystemTime=true], [throttle])
详情
该函数用于在 CEP 中创建一个 DataView 引擎。该引擎负责维护指定监控值的最新快照,并将其输出到目标表(通常为流表),供其他程序订阅。通过该函数,用户可以在 CEP 引擎运行过程中实时监控关键变量的变化趋势。
参数
name 字符串,表示 DataView 引擎的名称,可包含字母,数字和下划线,但必须以字母开头。
outputTable 一个表,可以是内存表或分布式表,用于存储 DataView 引擎中的数据。如果需要前端展示实时数据,或绘制数据变化趋势图,则 outputTable 必须指定为一个流表。
keyColumns 字符串标量或向量,为 outputTable 中的列名。引擎将使用指定列中数据的唯一(组合)值作为引擎的键值,对于每个键值,引擎都只保留 1 条数据。
timeColumn 一个字符串,表示指定 outputTable 中时间列的名称。
useSystemTime 布尔值,表示是否使用数据注入引擎时的系统时间作为输出的时间列。
- 若 useSystemTime=true,输出表中的时间列为系统时间,此时数据中不能包含时间列。
- 若 useSystemTime=false,输出表中的时间列为数据中的时间,此时需要写入的数据中包含时间。
throttle DURATION 类型,用于设置 DataView 引擎输出数据到 outputTable 的时间间隔。
返回值
一个键值表。该表记录了每个键值对应的最新记录。
例子
class trades{
trader :: STRING
market :: STRING
code :: STRING
price :: DOUBLE
qty :: INT
eventTime :: TIMESTAMP
def trades(t, m, c, p, q){
trader = t
market = m
code = c
price = p
qty = q
eventTime = now()
}
}
class mainMonitor:CEPMonitor{
tradesTable :: ANY
isBusy :: BOOL
def mainMonitor(){
tradesTable = array(ANY, 0)
isBusy = false
}
def updateTrades(event)
def updateTrades2(event)
def unOnload(){
undef('traderDV', SHARED)
}
def onload(){
addEventListener(updateTrades, `trades, , "all",,,,,"trades1")
addEventListener(updateTrades2, `trades, , "all",,,,,"trades2")
traderDV = streamTable(array(STRING, 0) as trader, array(STRING, 0) as market, array(SYMBOL, 0) as code, array(DOUBLE, 0) as price, array(INT, 0) as qty, array(INT, 0) as tradeCount, array(BOOL, 0) as busy, array(DATE, 0) as orderDate, array(TIMESTAMP, 0) as updateTime)
share(traderDV, 'traderDV')
createDataViewEngine('traderDV', objByName('traderDV'), `trader, `updateTime, true)
}
def updateTrades(event) {
tradesTable.append!([event.trader, event.market, event.code, event.price, event.qty])
getDataViewEngine('traderDV').append!(table(event.trader as trader, string() as market, string() as code, 0.0 as price, 0 as qty, 0 as tradeCount, false as busy, date(event.eventTime) as orderDate))
updateDataViewItems('traderDV', event.trader, ['market', 'code', 'price', 'qty', 'tradeCount'], [event.market, event.code, event.price, event.qty, tradesTable.size()])
}
def updateTrades2(event) {
tradesTable.append!([event.trader, event.market, event.code, event.price, event.qty])
getDataViewEngine('traderDV').append!(table(event.trader as trader, string() as market, string() as code, 0.0 as price, 0 as qty, 0 as tradeCount, false as busy, date(event.eventTime) as orderDate))
updateDataViewItems('traderDV', event.trader, ['market', 'code', 'price', 'qty', 'tradeCount'], [event.market, event.code, event.price, event.qty, tradesTable.size()])
}
}
dummy = table(array(TIMESTAMP, 0) as eventTime, array(STRING, 0) as eventType, array(BLOB, 0) as blobs)
engineCep = createCEPEngine('cep1', <mainMonitor()>, dummy, [trades], 1, 'eventTime', 10000)
trade1 = trades('t1', 'sz', 's001', 11.0, 10)
go
appendEvent(engineCep, trade1)
// 查看 monitor
monitors = getCEPEngineMonitor('cep1',"cep1","mainMonitor")
// 查看 listener
listeners = monitors.getEventListener()
print(listeners)
// 终止 listener
listeners['trades1'].terminate()
print(listeners)
相关函数:createCEPEngine, deleteDataViewItems, dropDataViewEngine, getDataViewEngine