streamEventSerializer

Syntax

streamEventSerializer(name, eventSchema, outputTable, [eventTimeField], [commonField])

Details

Serializes events into BLOB and writes them to a heterogeneous stream table.

Parameters

name is a string indicating the engine name. It consists of letters, digits, and underscores(_) and must start with a letter.

eventSchema is a table or a scalar/vector of class definition of event types, indicating the data to be serialized. If it is a table, it must have the schema as follows:

Column Data Type Comment
eventType STRING The event type.
eventField STRING The field names (separated by comma) of the event type.
fieldType (optional) STRING Data types (separated by comma) of eventField.
fieldTypeId INT[] Data type IDs of eventField.
fieldFormId INT[] Data form IDs of eventField (0: scalar; 1: vector; 2: pair; 3: matrix; 4: set; 5: dictionary; 6: table). Currently, only 0 and 1 can be specified.

outputTable is a non-partitioned in-memory table or a stream table for outputting the results. The output columns are in the following order:

(1) A time column of TIMESTAMP type (if eventTimeField is specified);

(2) A SYMBOL or STRING column indicating the events;

(3) A BLOB column that stores the serialized result of each event;

(4) The column(s) with the same names and data types (if commonField specified).

eventTimeField (optional) is a string scalar or vector indicating the name(s) of time field for the event(s).

  • If all events share the same time field name, simply specify it as a single string.

  • If the time field varies among events, specify a vector with the same length as eventSchema. Each element corresponds to one time field.

commonField (optional) is a string scalar or vector indicating the field(s) with the same name and data type. If specified, the common fields can be filtered out during subscription.

Returns

A table object.

Examples

class MarketData{
    market :: STRING
    code :: STRING
    price :: DOUBLE
    qty :: INT
    def MarketData(m,c,p,q){
        market = m
        code = c
        price = p
        qty = q
    }
}
class Orders{
    trader :: STRING
    market :: STRING
    code :: STRING
    price :: DOUBLE
    qty :: INT
    def Orders(t, m,c,p,q){
        trader = t
        market = m
        code = c
        price = p
        qty = q
    }
}
class Trades{
    trader :: STRING
    market :: STRING
    code :: STRING
    price :: DOUBLE
    qty :: INT
    def Trades(t, m,c,p,q){
        trader = t
        market = m
        code = c
        price = p
        qty = q
    }
}
share streamTable(array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as events
serializer = streamEventSerializer(name=`serOutput, eventSchema=[MarketData, Orders, Trades], outputTable=events)

The eventTimeField parameter can be used to specify the name of the time field in the event, indicating that the serializer should use this field as the event time. In this case, the first column of the output table must be a timestamp.

If filtering based on certain fields is required, the commonField parameter can be used to specify the relevant field names.

// Define the event class  
class MarketData{  
    market :: STRING  
    code :: STRING  
    price :: DOUBLE  
    qty :: INT  
    timestamp :: TIMESTAMP  
    def MarketData(m,c,p,q){  
        market = m  
        code = c  
        price = p  
        qty = q  
        timestamp = now()  
    }  
}  
  
// Define the monitor 
class TestMonitor: CEPMonitor {  
    def TestMonitor(){  
    } 
      
    def processMarketData(event){  
        // Send the event to the output table (serializer)  
        emitEvent(event)  
    }
          
    def onload(){  
        // Listen for MarketData events and send them to the serializer  
        addEventListener(handler=processMarketData, eventType="MarketData", times="all")  
    }       
  
}  
  
// Create the output table  
share(streamTable(array(TIMESTAMP, 0) as eventTime, array(STRING, 0) as eventType, array(BLOB, 0) as blobs, array(STRING, 0) as market, array(STRING, 0) as code), `events) 
  
// Create the serializer  
serializer = streamEventSerializer(name=`serOutput,   
                                 eventSchema=[MarketData],   
                                 outputTable=events,  
                                 eventTimeField="timestamp",  
                                 commonField=["market", "code"])  
  
// Create the CEP engine  
dummy = table(array(STRING, 0) as eventType, array(BLOB, 0) as blobs)  
engine = createCEPEngine(name="testEngine",   
                       monitors=<TestMonitor()>,   
                       dummyTable=dummy,   
                       eventSchema=[MarketData],  
                       outputTable=serializer)  
  
// Create and ingest events  
md1 = MarketData("SHSE", "000001", 10.50, 1000)  
md2 = MarketData("SZSE", "000002", 15.80, 500)  
  
// Ingest events through the CEP engine  
appendEvent(engine, md1)  
appendEvent(engine, md2)  
  
// Check results  
select * from events
eventTime eventType blobs market code
2026.04.10 19:43:13.585 MarketData SHSE000001%@�\u0003���x�\u0001 SHSE 000001
2026.04.10 19:43:13.585 MarketData SZSE000002������/@�\u0001���x�\u0001 SZSE 000002