Using OOP to Develop Operators for Reactive State Engine
With the growing demand for real-time stream processing, efficient and scalable stream computing frameworks have become increasingly important. DolphinDB, a high-performance distributed time series database, not only excels in data storage and querying but also introduces object-oriented programming (OOP) to enhance code modularity, maintainability, and reusability through encapsulation, inheritance, and polymorphism.
This article presents two real-world examples to demonstrate how OOP can be used to implement stateful operators in DolphinDB’s reactive state engine, showcasing its practical value.
1. Overview of OOP
Starting from version 3.00.0, DolphinDB supports OOP. It can be applied to multiple scenarios, including:
- Operator development in reactive state engine: Prior to OOP support, developing certain operators required complex higher-order functions or plugin-based implementations, which were difficult to maintain. OOP simplifies development by providing a clearer and more structured approach.
- Complex Event Processing (CEP): OOP can be used to define events and monitors in CEP engine.
This tutorial focuses on using OOP to develop operators for DolphinDB’s reactive state engine.
2. Using OOP in DolphinDB
2.1 Defining a Class
The syntax for defining a class in DolphinDB is as follows:
class ClassName {
attribute1 :: dataType
attribute2 :: dataType
// ...
// Define class
def ClassName(arg1, arg2 /*, ... */) {
attribute1 = arg1
attribute2 = arg2
// ...
}
// Define method
def methodName(arg1, arg2 /*, ... */) {
// ...
}
}
For example, the following defines a Person
class with two
member variables: name (a string) and age (an integer). The class also includes
a constructor and getter/setter methods for name:
class Person {
// Variable declarations must precede method definitions
name :: STRING
age :: INT
// Constructor definition
def Person(name_, age_) { // Parameter names must differ from member names
name = name_
age = age_
}
def setName(newName) {
name = newName
}
def getName() {
return name
}
}
2.2 Instantiating an Object
Object methods are invoked using the object.method()
, while
object attributes are accessed using object.member
.Unlike
scripting languages such as Python, DolphinDB does not allow direct assignment
to object attributes. To modify a attribute, the corresponding setter method
must be used.
p = Person("Zhang San", 30)
print(p.getName())
// Call a method
p.setName("Li Si")
print(p.getName())
// Access a property
print(p.name)
p.name = "Wang Wu" // Error: Direct assignment to object attributes is not allowed
2.3 Declaring Member Variables
Member variables are defined using the syntax: memberName ::
dataType
.
Supported data type include:
- Scalars: Basic data types such as INT, DOUBLE, STRING, TEMPORAL, etc.
- Vectors: Similar to scalar declarations, but with
VECTOR
appended to indicate that the member variable is a vector. Examples includeDOUBLE VECTOR
,STRING VECTOR
. - ANY: Used for other types (e.g., dictionaries, functions) or when no strict type enforcement is required.
For example:
x :: INT
y :: DOUBLE VECTOR
z :: ANY
2.4 Variable Parsing in Class Methods
The following example demonstrates the parsing order of variables when used within a member method:
- Method Parameters: When referencing a variable within a method, the
system first checks if the variable name matches any of the method's
parameters. In the example, within the
method
, the variable b is referenced, and it matches the parameter name, so b is parsed as the method parameter. - Attributes: If there is no matching method parameter, the system checks if the variable is an attribute of the object. In the example, the variable a is not defined within the method, so it checks the object's attributes and parsed a as an attribute.
- Shared Variables: If the variable does not match either a method parameter or an attribute, the system checks if it is a shared variable defined outside the class.
- Non-existence: If the variable is not found through the above three steps, an error is raised indicating that the variable does not exist.
share table(1:0, `sym`val, [SYMBOL, INT]) as tbl
go
class Test2 {
a :: INT
b :: DOUBLE
def Test2() {
a = 1
b = 2.0
}
def method(b) {
print(b) // Parsed as method parameter 'b'
print(a) // Parsed as object attrribute 'a'
print(tbl) // Parsed as shared variable 'tbl'
print(qwert) // Undefined variable; throws an error
}
}
This parsing order ensures that when using variables within member methods, the closest scope is considered first, following a logical progression from method parameters to attributes and then to shared variables.
2.5 self
The self
variable is used to reference the instance of the
class, which is similar to the self
in Python or the
this
pointer in Java and C++.
def doSomething(a) {
// ...
}
class A{
a :: INT
b :: INT
def A() {
a = 1
b = 2
}
def createCallback() {
return doSomething{self}
}
def nameShadow(b) {
print(b)
print(self.b)
}
}
a = A()
handler = a.createCallback()
a.nameShadow(3)
3. Use Cases
The Reactive State Engine (RSE) is a high-performance and scalable computing framework in DolphinDB designed for processing real-time streaming data. RSE enables incremental computation and complex event processing by capturing and maintaining states through stateful operators. This section presents two examples to demonstrate how to develop UDF operators for RSE using OOP.
3.1 Cumulative Sum Operator: MyCumSum
Within the RSE, the built-in cumsum
operator performs cumulative
summation. This section demonstrates how to reimplement the operator using OOP
to illustrate the development process.
- Define a class
MyCumSum
, where the operator's state is declared as a member variable. - Implement an
append
method in the class to perform cumulative summation. This method takes a single row of input and returns the computed result. - Create a RES instance and register
MyCumSum.append()
as the operator. Inject data into the engine and observe the output.
The implementation is as follows:
class MyCumSum {
sum :: DOUBLE
def MyCumSum() {
sum = 0.0
}
def append(value) {
sum = sum + value
return sum
}
}
inputTable = table(1:0, `sym`val, [SYMBOL, DOUBLE])
result = table(1000:0, `sym`res, [SYMBOL, DOUBLE])
rse = createReactiveStateEngine(
name="reactiveDemo",
metrics = [<MyCumSum().append(val)>],
dummyTable=inputTable,
outputTable=result,
keyColumn="sym")
data = table(take(`A, 100) as sym, rand(100.0, 100) as val)
rse.append!(data)
select * from data
select * from result
After executing, the randomly generated input and the corresponding output are as follows:
Input Data:

Output Data:

The OOP-based UDF operator correctly performs grouped cumulative summation,
achieving the same functionality as the built-in cumsum
operator in the RSE.
3.2 Linear Recursion
Before OOP support, implementing linear recursion in the RSE required using the
built-in stateIterate
function. This function takes several
parameters to define the stateful logic.
For example:
trade = table(take("A", 6) join take("B", 6) as sym, 1..12 as val0, take(10, 12) as val1)
inputTable = streamTable(1:0, `sym`val0`val1, [SYMBOL, INT, INT])
outputTable = table(100:0, `sym`factor, [STRING, DOUBLE])
engine = createReactiveStateEngine(
name="rsTest",
metrics=<[stateIterate(val0, val1, 3, msum{, 3}, [0.5, 0.5])]>,
dummyTable=inputTable,
outputTable=outputTable,
keyColumn=["sym"],
keepOrder=true)
engine.append!(trade)
select * from outputTable
While stateIterate(val0, val1, 3, msum{, 3}, [0.5, 0.5])
in the
above example implements linear recursion according to the rules of
stateIterate
, its logic may not be immediately intuitive,
potentially hindering readability and maintainability.
By re-implementing the same logic using OOP, we can make the structure much clearer:
- If the window size is less than 3, the operator directly returns the result
defined in
initial
. - When the window size is greater than or equal to 3, the
append
method performs a weighted average between the sum of the 3-element window and the value inX
.
This effectively achieves the same functionality as stateIterate(val0,
val1, 3, msum{, 3}, [0.5, 0.5])
.
This OOP implementation achieves the same result as:
trade = table(take("A", 6) join take("B", 6) as sym, 1..12 as val0, take(10, 12) as val1)
inputTable = streamTable(1:0, `sym`val0`val1, [SYMBOL, INT, INT])
outputTable = table(100:0, `sym`factor, [STRING, DOUBLE])
class MyIterateOperator {
movingWindow :: DOUBLE VECTOR
k :: INT
def MyIterateOperator() {
k = 0
movingWindow = double([])
}
def append(X, initial) {
if (k < 3) {
k = k + 1
movingWindow = movingWindow join initial
return double(initial)
}
result = 0.5 * X + 0.5 * sum(movingWindow)
movingWindow = movingWindow[1:] join result
return double(result)
}
}
engine2 = createReactiveStateEngine(
name="rsTest2",
metrics=<[MyIterateOperator().append(val0, val1)]>,
dummyTable=inputTable,
outputTable=outputTable,
keyColumn=["sym"],
keepOrder=true)
engine2.append!(trade)
select * from outputTable
4. Summary and Outlook
This tutorial demonstrated how to develop UDF stateful operators in DolphinDB's Reactive State Engine using OOP. Compared to using built-in operators directly, OOP-based implementations offer clearer structure and greater code readability.Currently, OOP operators in DolphinDB are executed in an interpreted manner, which results in relatively lower performance. In the future, we plan to introduce just-in-time (JIT) compilation to optimize the execution of OOP operators, aiming to improve both development efficiency and runtime performance.