createDataViewEngine

Syntax

createDataViewEngine(name, outputTable, keyColumns, timeColumn, [useSystemTime=true], [throttle],[includeOperationType=false])

Details

The createDataViewEngine function allows you to create a data view engine that returns a keyed table with keyColumns as the key. The table maintains the latest record for each key. The data view engine is responsible for maintaining an up-to-date snapshot of each monitored variable and exporting the data to a target table, usually a stream table, which can be subscribed by other clients.

Parameters

name is a string indicating the name of the data view engine. It consists of letters, digits, or underscores (_) and must start with a letter.

outputTable is an in-memory table or a DFS table. If you want to display real-time data, or to graph trends of the data, it must be a stream table.

keyColumns is a STRING scalar or vector that specifies one or more columns in the outputTable as the key columns. For each unique value in the keyColumn, only the latest record is retained.

timeColumn is a STRING scalar which specifies the time column in the output table.

useSystemTime (optional) is a Boolean value indicating whether to use the system time as the time column for the output table.

  • If the input data does not contain a time column, useSystemTime should set to true, i.e., the time column of outputTable is the system time.

  • If the input data contain a time column, useSystemTime should set to false, i.e., the time column of outputTable uses the timestamp of each record.

throttle (optional) is of DURATION type, which specifies the time interval between data writes to the outputTable.

includeOperationType (optional) is a Boolean value indicating whether the output includes a column specifying the type of operation performed on each record. The default value is false.

When set to true, the output table will include an additional leading column of type CHAR that specifies the operation applied to each record:

  • 'A': insert
  • 'U': update
  • 'D': delete

Note: Deleted records are not emitted by default. When includeOperationType is enabled, deleted records are emitted with operation type 'D', which may result in a different number of output records.

Returns

Returns a keyed table with keyColumns as the key.

Examples

Example 1. This example demonstrates how to use the CEP (Complex Event Processing) engine to maintain the latest state of stock orders in real time, including order insertion and deletion operations.

class Orders{
    market :: STRING
    code :: STRING
    price :: DOUBLE
    qty :: INT
    def Orders(m,c,p,q){
        market = m
        code = c
        price = p
        qty = q
    }
}
class DeleteOrder{
 
    code :: STRING
  
    def DeleteOrder(c){
        code = c
    }
}
// Define the monitor
class MainMonitor:CEPMonitor {
    def MainMonitor(){
    }
    // Automatically called when the engine is deleted: drops the shared stream table
    def onunload(){ undef('orderDV', SHARED) }
    def checkOrders(newOrder)
    def deleteOrder(order)
    // Create a data view engine with the primary key set to the code column to maintain the latest order information for each stock
    def onload(){
        addEventListener(checkOrders,'Orders',,'all') 
        orderDV = streamTable(array(CHAR, 0) as type, array(STRING, 0) as market, array(STRING, 0) as code, array(DOUBLE, 0) as price, array(INT, 0) as qty, array(TIMESTAMP, 0) as updateTime)
        share(orderDV,'orderDV')
        createDataViewEngine('orderDV', objByName('orderDV'), `code, `updateTime, true, ,true)
        addEventListener(deleteOrder,'DeleteOrder',,'all') 
    }
    // Update the latest order information for each stock
    def checkOrders(newOrder){
        getDataViewEngine('orderDV').append!(table(newOrder.market as market, newOrder.code as code, newOrder.price as price, newOrder.qty as qty))
    }
    def deleteOrder(order){
        deleteDataViewItems('orderDV',order.code )
    }
}
// Create the CEP engine
dummy = table(array(STRING, 0) as eventType, array(BLOB, 0) as blobs)
try{dropStreamEngine('cep1')}catch(ex){print(ex)}
engine = createCEPEngine(name='cep1', monitors=<MainMonitor()>, dummyTable=dummy, eventSchema=[Orders,DeleteOrder])
engine.appendEvent(Orders("m1", "c1", 10.0, 100))
engine.appendEvent(Orders("m1", "c2", 10.0, 100))
engine.appendEvent(Orders("m1", "c2", 9.5, 100))
engine.appendEvent(DeleteOrder("c2"))

// Query order data from orderDV
select * from orderDV
type market code price qty updateTime
A m1 c1 10 100 2026.02.01 14:53:12.928
A m1 c2 10 100 2026.02.01 14:53:12.928
U m1 c2 9.5 100 2026.02.01 14:53:12.928
D m1 c2 9.5 100 2026.02.01 14:53:12.928
If the operation type for order updates is not required, you can set includeOperationType=false and modify the schema of the orderDV table accordingly. Replace the onload script with the following code:
def onload(){
    addEventListener(checkOrders,'Orders',,'all') 
    orderDV = streamTable(array(STRING, 0) as market, array(STRING, 0) as code, array(DOUBLE, 0) as price, array(INT, 0) as qty, array(TIMESTAMP, 0) as updateTime)
    share(orderDV,'orderDV')
    createDataViewEngine(name='orderDV', outputTable=objByName('orderDV'), keyColumns=`code, timeColumn=`updateTime, useSystemTime=true, includeOperationType=false)
    addEventListener(deleteOrder,'DeleteOrder',,'all') 
}

The orderDV table is shown below. It can be seen that when includeOperationType=false, the engine does not output records for deleted data, resulting in a different number of rows compared to when includeOperationType is true.

market code price qty updateTime
m1 c1 10 100 2026.04.17 17:35:42.313
m1 c2 10 100 2026.04.17 17:35:42.313
m1 c2 9.5 100 2026.04.17 17:35:42.313

Example 2. This example demonstrates the effect of the throttle parameter. When creating the Data View engine, if the throttle parameter is set to 30 seconds, the engine will only write data to the output table at 30-second intervals. Modify the onload function as follows:

def onload(){
    addEventListener(checkOrders,'Orders',,'all') 
    orderDV = streamTable(array(STRING, 0) as market, array(STRING, 0) as code, array(DOUBLE, 0) as price, array(INT, 0) as qty, array(TIMESTAMP, 0) as updateTime)
    share(orderDV,'orderDV')
    createDataViewEngine(name='orderDV', outputTable=objByName('orderDV'), keyColumns=`code, timeColumn=`updateTime, useSystemTime=true, throttle=30s, includeOperationType=false)
    addEventListener(deleteOrder,'DeleteOrder',,'all') 
}

After ingesting events into the engine and waiting 30 seconds, check the orderDV table. You will see that only one record, c1, remains in the table. This is because before the engine outputs the data, a DeleteOrder event has already entered the engine and removed the record c2.

Related functions: createCEPEngine, deleteDataViewItems, dropDataViewEngine, getDataViewEngine