createDataViewEngine

语法

createDataViewEngine(name, outputTable, keyColumns, timeColumn, [useSystemTime=true], [throttle])

详情

该函数用于在 CEP 中创建一个 DataView 引擎。该引擎负责维护指定监控值的最新快照,并将其输出到目标表(通常为流表),供其他程序订阅。通过该函数,用户可以在 CEP 引擎运行过程中实时监控关键变量的变化趋势。

参数

name 字符串,表示 DataView 引擎的名称,可包含字母,数字和下划线,但必须以字母开头。

outputTable 一个表,可以是内存表或分布式表,用于存储 DataView 引擎中的数据。如果需要前端展示实时数据,或绘制数据变化趋势图,则 outputTable 必须指定为一个流表。

keyColumns 字符串标量或向量,为 outputTable 中的列名。引擎将使用指定列中数据的唯一(组合)值作为引擎的键值,对于每个键值,引擎都只保留 1 条数据。

timeColumn 一个字符串,表示指定 outputTable 中时间列的名称。

useSystemTime 布尔值,表示是否使用数据注入引擎时的系统时间作为输出的时间列。

  • useSystemTime=true,输出表中的时间列为系统时间,此时数据中不能包含时间列。
  • useSystemTime=false,输出表中的时间列为数据中的时间,此时需要写入的数据中包含时间。

throttle DURATION 类型,用于设置 DataView 引擎输出数据到 outputTable 的时间间隔。

返回值

一个键值表。该表记录了每个键值对应的最新记录。

例子

class trades{
    trader :: STRING
    market :: STRING
    code :: STRING
    price :: DOUBLE
    qty :: INT
    eventTime :: TIMESTAMP
    def trades(t, m, c, p, q){
        trader = t
        market = m
        code = c
        price = p
        qty = q
        eventTime = now()
    }
}

class mainMonitor:CEPMonitor{
    tradesTable :: ANY
    isBusy :: BOOL
    def mainMonitor(){
        tradesTable = array(ANY, 0)
        isBusy = false
    }

    def updateTrades(event)

    def updateTrades2(event)

    def unOnload(){
        undef('traderDV', SHARED)
    }

    def onload(){
        addEventListener(updateTrades, `trades, , "all",,,,,"trades1")
        addEventListener(updateTrades2, `trades, , "all",,,,,"trades2")
        traderDV = streamTable(array(STRING, 0) as trader, array(STRING, 0) as market, array(SYMBOL, 0) as code, array(DOUBLE, 0) as price, array(INT, 0) as qty, array(INT, 0) as tradeCount, array(BOOL, 0) as busy, array(DATE, 0) as orderDate, array(TIMESTAMP, 0) as updateTime)
        share(traderDV, 'traderDV')
        createDataViewEngine('traderDV', objByName('traderDV'), `trader, `updateTime, true)
    }

    def updateTrades(event) {
        tradesTable.append!([event.trader, event.market, event.code, event.price, event.qty])
        getDataViewEngine('traderDV').append!(table(event.trader as trader, string() as market, string() as code, 0.0 as price, 0 as qty, 0 as tradeCount, false as busy, date(event.eventTime) as orderDate))
        updateDataViewItems('traderDV', event.trader, ['market', 'code', 'price', 'qty', 'tradeCount'], [event.market, event.code, event.price, event.qty, tradesTable.size()])
    }

    def updateTrades2(event) {
        tradesTable.append!([event.trader, event.market, event.code, event.price, event.qty])
        getDataViewEngine('traderDV').append!(table(event.trader as trader, string() as market, string() as code, 0.0 as price, 0 as qty, 0 as tradeCount, false as busy, date(event.eventTime) as orderDate))
        updateDataViewItems('traderDV', event.trader, ['market', 'code', 'price', 'qty', 'tradeCount'], [event.market, event.code, event.price, event.qty, tradesTable.size()])
    }
}

dummy = table(array(TIMESTAMP, 0) as eventTime, array(STRING, 0) as eventType, array(BLOB, 0) as blobs)
engineCep = createCEPEngine('cep1', <mainMonitor()>, dummy, [trades], 1, 'eventTime', 10000)
trade1 = trades('t1', 'sz', 's001', 11.0, 10)
go
appendEvent(engineCep, trade1)
// 查看 monitor
monitors = getCEPEngineMonitor('cep1',"cep1","mainMonitor")
// 查看 listener
listeners = monitors.getEventListener()
print(listeners)
// 终止 listener
listeners['trades1'].terminate()
print(listeners)

相关函数createCEPEngine, deleteDataViewItems, dropDataViewEngine, getDataViewEngine