EventSender

The Python API 3.0.0.0 provides the EventSender class for writing serialized event data to heterogeneous stream tables. These stream tables will be subscribed by DolphinDB CEP engine to capture and process the corresponding events as they occur.

Methods

Constructor

The constructor declaration for EventSender is as follows:

EventSender(ddbSession, tableName, eventSchema, eventTimeFields=None, commonFields=None)

Arguments

  • ddbSession: required. A session already connected to DolphinDB.
  • tableName: str, required. The name of the heterogeneous stream table to be written.
  • eventSchema: required. The list of events to be sent, e.g., .
    • The events specified must be inherited from the Event class.
    • The schema of each event defined in API and DolphinDB must be the same.
  • eventTimeFields: str or list of strings, defaults to None, optional. The time field for each event. It must be specified when the heterogeneous stream table "tableName" contains the time column.
    • It is a string if the field name is the same for all events.
    • It is a list of strings if field names vary across events. The order of strings must match that of the events specified in eventSchema.
  • commonFields: list of strings, defaults to None, optional. It must be specified when the heterogeneous stream table "tableName" contains the common column.

sendEvent

Use the sendEvent method to send events to DolphinDB server.

sendEvent(event)

Arguments

  • event: The event instance.

A Complete Example

The following example demonstrates the process of sending events from API to DolphinDB server:

DolphinDB server:

  1. Define events and the monitor.
  2. Create a CEP engine.
  3. Create and subscribe to a heterogeneous stream table.

Python API:

  1. Define events.
  2. Write events to the heterogeneous stream table.

Defining Events and the Monitor

In the following example, we define two events (EventA and EventB), a shared in-memory table ("result"), and a monitor (Monitor).

Events

Both EventA and EventB contain attribute eventTime of TIMESTAMP type and a of INT type. EventB contains another attribute b of DOUBLE type.

class EventA {
    eventTime::TIMESTAMP
    a::INT
    def EventA(a_) {
        eventTime = now()
        a = a_
    }
}

class EventB {
    eventTime::TIMESTAMP
    a::INT
    b::DOUBLE
    def EventB(a_, b_) {
        eventTime = now()
        a = a_
        b = b_
    }
}

Shared In-Memory Table

Create table "result".

share table(100:0, `eventTime`eventType`a`b, [TIMESTAMP, SYMBOL, INT, DOUBLE]) as result

Monitor

For the Monitor class,

  • Define two member functions updateEventA and updateEventB to update data of EventA and EventB to "result".
  • Define member function onload. The onload method uses the addEventListener function to register two listeners that continuously write event data to the "result" table as long as new instances of EventA or EventB are captured.
class Monitor {
    def Monitor(){}
    def updateEventA(event) {
        insert into result values(event.eventTime, "A", event.a, NULL)
    }
    def updateEventB(event) {
        insert into result values(event.eventTime, "B", event.a, event.b)
    }
    def onload() {
        addEventListener(updateEventA,"EventA",,"all")
        addEventListener(updateEventB,"EventB",,"all")
    }
}

Creating a CEP Engine

When creating a CEP engine, you need to define a dummy table whose schema is the same as the heterogeneous stream table to which the engine subscribes. The table schema contains the following columns:

(1) optional. A time column for event times. To filter the stream table using event times, or to use time fields for processing, you can add a time column as the first column.

(2) required. A STRING/SYMBOL column for events.

(3) required. A BLOB type for serialized event data.

(4) optional. Columns for event fields with the same attibute. To filter the stream table using common fields, you can add one or more columns following the above columns.

Follow the example above, as both EventA and EventB have attributes eventTime and a, we specify a time column "eventTime" and a column "a" as the common field.

The schema of the stream table is . Use the following DolphinDB script to create a CEP engine:

dummy = table(100:0, `eventTime`eventType`blob`a, [TIMESTAMP, STRING, BLOB, INT])
engine = createCEPEngine('cep', <Monitor()>, dummy, [EventA, EventB], 1, "eventTime")

Subscribing to Events

The CEP engine receives event data with the subscription to a heterogeneous stream table. Define a stream table "input" in DolphinDB and initiate the subscription:

share streamTable(100:0, `eventTime`eventType`blob`a, [TIMESTAMP, STRING, BLOB, INT]) as input
subscribeTable(,`input, `action, 0, engine, true)

Defining Events on API

In Python API, define events EventA and EventB with the same schema as events defined in server.

from dolphindb.cep import Event
from dolphindb.typing import Scalar
import dolphindb.settings as keys


class EventA(Event):
    eventTime: Scalar[keys.DT_TIMESTAMP]
    a: Scalar[keys.DT_INT]


class EventB(Event):
    eventTime: Scalar[keys.DT_TIMESTAMP]
    a: Scalar[keys.DT_INT]
    b: Scalar[keys.DT_DOUBLE]

Writing Event Data to Stream Table

Construct a EventSender. The EventA and EventB (specified by eventSchema) will be sent to stream table "input" (specified by tableName). Since the "input" table contains time column and column for common field, eventTimeFields and commonFields are specified as "eventTime" and , respectively.

import dolphindb as ddb
from dolphindb.cep import EventSender

s = ddb.Session()
s.connect("localhost", 8848, "admin", "123456")
sender = EventSender(s, "input", [EventA, EventB], "eventTime", ["a"])

Then, use sendEvent method to write event instances to EventA and EventB.

import numpy as np
sender.sendEvent(EventA(np.datetime64("2024-03-31T12:00:00.123", "ms"), 1))
sender.sendEvent(EventB(np.datetime64("2024-03-31T12:00:00.456", "ms"), 2, 3.3))

Querying Results

The output is as follows:

print(s.run("select * from result"))
# output
                eventTime eventType  a    b
0 2024-03-31 12:00:00.123         A  1  NaN
1 2024-03-31 12:00:00.456         B  2  3.3