streamEventSerializer
Syntax
streamEventSerializer(name, eventSchema, outputTable, [eventTimeField],
[commonField])
Details
Serializes events into BLOB and writes them to a heterogeneous stream table.
Parameters
name is a string indicating the engine name. It consists of letters, digits, and underscores(_) and must start with a letter.
eventSchema is a table or a scalar/vector of class definition of event types, indicating the data to be serialized. If it is a table, it must have the schema as follows:
| Column | Data Type | Comment |
|---|---|---|
| eventType | STRING | The event type. |
| eventField | STRING | The field names (separated by comma) of the event type. |
| fieldType (optional) | STRING | Data types (separated by comma) of eventField. |
| fieldTypeId | INT[] | Data type IDs of eventField. |
| fieldFormId | INT[] | Data form IDs of eventField (0: scalar; 1: vector; 2: pair; 3: matrix; 4: set; 5: dictionary; 6: table). Currently, only 0 and 1 can be specified. |
outputTable is a non-partitioned in-memory table or a stream table for outputting the results. The output columns are in the following order:
(1) A time column of TIMESTAMP type (if eventTimeField is specified);
(2) A SYMBOL or STRING column indicating the events;
(3) A BLOB column that stores the serialized result of each event;
(4) The column(s) with the same names and data types (if commonField specified).
eventTimeField (optional) is a string scalar or vector indicating the name(s) of time field for the event(s).
-
If all events share the same time field name, simply specify it as a single string.
-
If the time field varies among events, specify a vector with the same length as eventSchema. Each element corresponds to one time field.
commonField (optional) is a string scalar or vector indicating the field(s) with the same name and data type. If specified, the common fields can be filtered out during subscription.
Returns
A table object.
Examples
class MarketData{
market :: STRING
code :: STRING
price :: DOUBLE
qty :: INT
def MarketData(m,c,p,q){
market = m
code = c
price = p
qty = q
}
}
class Orders{
trader :: STRING
market :: STRING
code :: STRING
price :: DOUBLE
qty :: INT
def Orders(t, m,c,p,q){
trader = t
market = m
code = c
price = p
qty = q
}
}
class Trades{
trader :: STRING
market :: STRING
code :: STRING
price :: DOUBLE
qty :: INT
def Trades(t, m,c,p,q){
trader = t
market = m
code = c
price = p
qty = q
}
}
share streamTable(array(STRING, 0) as eventType, array(BLOB, 0) as blobs) as events
serializer = streamEventSerializer(name=`serOutput, eventSchema=[MarketData, Orders, Trades], outputTable=events)
The eventTimeField parameter can be used to specify the name of the time field in the event, indicating that the serializer should use this field as the event time. In this case, the first column of the output table must be a timestamp.
If filtering based on certain fields is required, the commonField parameter can be used to specify the relevant field names.
// Define the event class
class MarketData{
market :: STRING
code :: STRING
price :: DOUBLE
qty :: INT
timestamp :: TIMESTAMP
def MarketData(m,c,p,q){
market = m
code = c
price = p
qty = q
timestamp = now()
}
}
// Define the monitor
class TestMonitor: CEPMonitor {
def TestMonitor(){
}
def processMarketData(event){
// Send the event to the output table (serializer)
emitEvent(event)
}
def onload(){
// Listen for MarketData events and send them to the serializer
addEventListener(handler=processMarketData, eventType="MarketData", times="all")
}
}
// Create the output table
share(streamTable(array(TIMESTAMP, 0) as eventTime, array(STRING, 0) as eventType, array(BLOB, 0) as blobs, array(STRING, 0) as market, array(STRING, 0) as code), `events)
// Create the serializer
serializer = streamEventSerializer(name=`serOutput,
eventSchema=[MarketData],
outputTable=events,
eventTimeField="timestamp",
commonField=["market", "code"])
// Create the CEP engine
dummy = table(array(STRING, 0) as eventType, array(BLOB, 0) as blobs)
engine = createCEPEngine(name="testEngine",
monitors=<TestMonitor()>,
dummyTable=dummy,
eventSchema=[MarketData],
outputTable=serializer)
// Create and ingest events
md1 = MarketData("SHSE", "000001", 10.50, 1000)
md2 = MarketData("SZSE", "000002", 15.80, 500)
// Ingest events through the CEP engine
appendEvent(engine, md1)
appendEvent(engine, md2)
// Check results
select * from events
| eventTime | eventType | blobs | market | code |
|---|---|---|---|---|
| 2026.04.10 19:43:13.585 | MarketData | SHSE000001%@�\u0003���x�\u0001 | SHSE | 000001 |
| 2026.04.10 19:43:13.585 | MarketData | SZSE000002������/@�\u0001���x�\u0001 | SZSE | 000002 |
