Using OOP to Develop Operators for Reactive State Engine

With the growing demand for real-time stream processing, efficient and scalable stream computing frameworks have become increasingly important. DolphinDB, a high-performance distributed time series database, not only excels in data storage and querying but also introduces object-oriented programming (OOP) to enhance code modularity, maintainability, and reusability through encapsulation, inheritance, and polymorphism.

This article presents two real-world examples to demonstrate how OOP can be used to implement stateful operators in DolphinDB’s reactive state engine, showcasing its practical value.

1. Overview of OOP

Starting from version 3.00.0, DolphinDB supports OOP. It can be applied to multiple scenarios, including:

  • Operator development in reactive state engine: Prior to OOP support, developing certain operators required complex higher-order functions or plugin-based implementations, which were difficult to maintain. OOP simplifies development by providing a clearer and more structured approach.
  • Complex Event Processing (CEP): OOP can be used to define events and monitors in CEP engine.

This tutorial focuses on using OOP to develop operators for DolphinDB’s reactive state engine.

2. Using OOP in DolphinDB

2.1 Defining a Class

The syntax for defining a class in DolphinDB is as follows:

class ClassName {
    attribute1 :: dataType
    attribute2 :: dataType
  // ...
  
  // Define class
  def ClassName(arg1, arg2 /*, ... */) {
    attribute1 = arg1
    attribute2 = arg2
    // ...
  }

  // Define method
  def methodName(arg1, arg2 /*, ... */) {
    // ...
  }
}

For example, the following defines a Person class with two member variables: name (a string) and age (an integer). The class also includes a constructor and getter/setter methods for name:

class Person {
  // Variable declarations must precede method definitions
  name :: STRING
  age :: INT

  // Constructor definition
  def Person(name_, age_) {  // Parameter names must differ from member names
    name = name_
    age = age_
  }

  def setName(newName) {
    name = newName
  }

  def getName() {
    return name
  }
}

2.2 Instantiating an Object

Object methods are invoked using the object.method(), while object attributes are accessed using object.member.Unlike scripting languages such as Python, DolphinDB does not allow direct assignment to object attributes. To modify a attribute, the corresponding setter method must be used.

p = Person("Zhang San", 30)
print(p.getName())
// Call a method
p.setName("Li Si")
print(p.getName())
// Access a property
print(p.name)
p.name = "Wang Wu"  // Error: Direct assignment to object attributes is not allowed

2.3 Declaring Member Variables

Member variables are defined using the syntax: memberName :: dataType.

Supported data type include:

  • Scalars: Basic data types such as INT, DOUBLE, STRING, TEMPORAL, etc.
  • Vectors: Similar to scalar declarations, but with VECTOR appended to indicate that the member variable is a vector. Examples include DOUBLE VECTOR, STRING VECTOR.
  • ANY: Used for other types (e.g., dictionaries, functions) or when no strict type enforcement is required.

For example:

x :: INT
y :: DOUBLE VECTOR
z :: ANY

2.4 Variable Parsing in Class Methods

The following example demonstrates the parsing order of variables when used within a member method:

  1. Method Parameters: When referencing a variable within a method, the system first checks if the variable name matches any of the method's parameters. In the example, within the method, the variable b is referenced, and it matches the parameter name, so b is parsed as the method parameter.
  2. Attributes: If there is no matching method parameter, the system checks if the variable is an attribute of the object. In the example, the variable a is not defined within the method, so it checks the object's attributes and parsed a as an attribute.
  3. Shared Variables: If the variable does not match either a method parameter or an attribute, the system checks if it is a shared variable defined outside the class.
  4. Non-existence: If the variable is not found through the above three steps, an error is raised indicating that the variable does not exist.
share table(1:0, `sym`val, [SYMBOL, INT]) as tbl
go

class Test2 {
  a :: INT
  b :: DOUBLE

  def Test2() {
    a = 1
    b = 2.0
  }

  def method(b) {
    print(b)    // Parsed as method parameter 'b'
    print(a)    // Parsed as object attrribute 'a'
    print(tbl)  // Parsed as shared variable 'tbl'
    print(qwert) // Undefined variable; throws an error
  }
}

This parsing order ensures that when using variables within member methods, the closest scope is considered first, following a logical progression from method parameters to attributes and then to shared variables.

2.5 self

The self variable is used to reference the instance of the class, which is similar to the self in Python or the this pointer in Java and C++.

def doSomething(a) {
  // ...
}

class A{
	a :: INT
	b :: INT
	def A() {
		a = 1
		b = 2
	}
	def createCallback() {
		return doSomething{self}
	}
	def nameShadow(b) {
		print(b)
		print(self.b)
	}
}
a = A()
handler = a.createCallback()
a.nameShadow(3)

3. Use Cases

The Reactive State Engine (RSE) is a high-performance and scalable computing framework in DolphinDB designed for processing real-time streaming data. RSE enables incremental computation and complex event processing by capturing and maintaining states through stateful operators. This section presents two examples to demonstrate how to develop UDF operators for RSE using OOP.

3.1 Cumulative Sum Operator: MyCumSum

Within the RSE, the built-in cumsum operator performs cumulative summation. This section demonstrates how to reimplement the operator using OOP to illustrate the development process.

  1. Define a class MyCumSum, where the operator's state is declared as a member variable.
  2. Implement an append method in the class to perform cumulative summation. This method takes a single row of input and returns the computed result.
  3. Create a RES instance and register MyCumSum.append() as the operator. Inject data into the engine and observe the output.

The implementation is as follows:

class MyCumSum {
  sum :: DOUBLE
  def MyCumSum() {
    sum = 0.0
  }
  def append(value) {
    sum = sum + value
    return sum
  }
}

inputTable = table(1:0, `sym`val, [SYMBOL, DOUBLE])
result = table(1000:0, `sym`res, [SYMBOL, DOUBLE])

rse = createReactiveStateEngine(
          name="reactiveDemo",
          metrics = [<MyCumSum().append(val)>],
          dummyTable=inputTable,
          outputTable=result,
          keyColumn="sym")

data = table(take(`A, 100) as sym, rand(100.0, 100) as val)
rse.append!(data)

select * from data
select * from result

After executing, the randomly generated input and the corresponding output are as follows:

Input Data:

Output Data:

The OOP-based UDF operator correctly performs grouped cumulative summation, achieving the same functionality as the built-in cumsum operator in the RSE.

3.2 Linear Recursion

Before OOP support, implementing linear recursion in the RSE required using the built-in stateIterate function. This function takes several parameters to define the stateful logic.

For example:

trade = table(take("A", 6) join take("B", 6) as sym,  1..12 as val0,  take(10, 12) as val1)

inputTable = streamTable(1:0, `sym`val0`val1, [SYMBOL, INT, INT])
outputTable = table(100:0, `sym`factor, [STRING, DOUBLE])
engine = createReactiveStateEngine(
  name="rsTest",
  metrics=<[stateIterate(val0, val1, 3, msum{, 3}, [0.5, 0.5])]>,
  dummyTable=inputTable,
  outputTable=outputTable,
  keyColumn=["sym"],
  keepOrder=true)

engine.append!(trade)
select * from outputTable

While stateIterate(val0, val1, 3, msum{, 3}, [0.5, 0.5]) in the above example implements linear recursion according to the rules of stateIterate, its logic may not be immediately intuitive, potentially hindering readability and maintainability.

By re-implementing the same logic using OOP, we can make the structure much clearer:

  • If the window size is less than 3, the operator directly returns the result defined in initial.
  • When the window size is greater than or equal to 3, the append method performs a weighted average between the sum of the 3-element window and the value in X.

This effectively achieves the same functionality as stateIterate(val0, val1, 3, msum{, 3}, [0.5, 0.5]).

This OOP implementation achieves the same result as:

trade = table(take("A", 6) join take("B", 6) as sym,  1..12 as val0,  take(10, 12) as val1)

inputTable = streamTable(1:0, `sym`val0`val1, [SYMBOL, INT, INT])
outputTable = table(100:0, `sym`factor, [STRING, DOUBLE])

class MyIterateOperator {
	movingWindow :: DOUBLE VECTOR
	k :: INT
	def	MyIterateOperator() {
		k = 0
		movingWindow = double([])
	}
	def append(X, initial) {
		if (k < 3) {
			k = k + 1
			movingWindow = movingWindow join initial
			return double(initial)
		}
		result = 0.5 * X + 0.5 * sum(movingWindow)
		movingWindow = movingWindow[1:] join result
		return double(result)
	}
}
engine2 = createReactiveStateEngine(
  name="rsTest2",
  metrics=<[MyIterateOperator().append(val0, val1)]>,
  dummyTable=inputTable,
  outputTable=outputTable,
  keyColumn=["sym"],
  keepOrder=true)
  
engine2.append!(trade)
select * from outputTable

4. Summary and Outlook

This tutorial demonstrated how to develop UDF stateful operators in DolphinDB's Reactive State Engine using OOP. Compared to using built-in operators directly, OOP-based implementations offer clearer structure and greater code readability.Currently, OOP operators in DolphinDB are executed in an interpreted manner, which results in relatively lower performance. In the future, we plan to introduce just-in-time (JIT) compilation to optimize the execution of OOP operators, aiming to improve both development efficiency and runtime performance.