Use Cases
Developing operators for reactive state engine
The OOP paradigm simplifies the development process of operators. Traditional methods
                require complex higher-order functions such as genericIterateState
                or plugins in C++. Introducing OOP makes the operator code more intuitive and
                understandable.
class MyCumSum {
  sum :: DOUBLE
  def MyCumSum() {
    sum = 0.0
  }
  def append(value) {
    sum = sum + value
    return sum
  }
}
metrics = [<MyCumSum().append(val)>]
inputTable = table(1:0, `sym`val, [SYMBOL, DOUBLE])
result = table(1000:0, `sym`res, [SYMBOL, DOUBLE])
rse = createReactiveStateEngine(
          name="reactiveDemo",
          metrics =metrics,
          dummyTable=inputTable,
          outputTable=result,
          keyColumn="sym")
rse.append!(table(rand(`A`B`C, 100) as sym, rand(100.0, 100) as val))
select TOP 10 * from resultReturns:
| sym | res | 
|---|---|
| B | 23.136844485998154 | 
| C | 97.25657706148922 | 
| C | 119.75046650040895 | 
| B | 28.15541410818696 | 
| C | 208.6210786132142 | 
| A | 17.531023011542857 | 
| B | 127.66627178061754 | 
| C | 286.6357475752011 | 
| A | 45.87050362024456 | 
| C | 343.2471259729937 | 
Parameters can be specified for the constructor of a class, which can only be constants. For example:
dropStreamEngine("reactiveDemo")
class MyCumSum {
  sum :: DOUBLE
  def MyCumSum(initialValue) {
    sum = initialValue
  }
  def append(value) {
    sum = sum + value
    return sum
  }
}
metrics= [<MyCumSum(0.0).append(val)>]
inputTable = table(1:0, `sym`val, [SYMBOL, DOUBLE])
result = table(1000:0, `sym`res, [SYMBOL, DOUBLE])
rse = createReactiveStateEngine(
          name="reactiveDemo",
          metrics =metrics,
          dummyTable=inputTable,
          outputTable=result,
          keyColumn="sym")
rse.append!(table(rand(`A`B`C, 100) as sym, rand(100.0, 100) as val))
select TOP 10 * from resultReturns:
| sym | res | 
|---|---|
| C | 79.73966575227678 | 
| A | 50.2432547044009 | 
| C | 108.56282587628812 | 
| A | 137.95565224718302 | 
| C | 179.22195203136653 | 
| B | 41.10089084133506 | 
| C | 253.5685545997694 | 
| A | 173.4938955400139 | 
| C | 262.32025355566293 | 
| B | 105.45554195996374 | 
Parameters of append can be constants or lambda expressions.
class MyCumSum {
  sum :: DOUBLE
  def MyCumSum() {
    g = def (): 0
    sum = g()
  }
  def append(value, f) {
    sum = f(sum, value)
    return sum
  }
}
metrics = [<MyCumSum().append(val, def (a, b): a + b)>]
inputTable = table(1:0, `sym`val, [SYMBOL, DOUBLE])
result = table(1000:0, `sym`res, [SYMBOL, DOUBLE])
result2 = table(1000:0, `sym`res, [SYMBOL, DOUBLE])
try {
    dropStreamEngine("reactiveDemo")
    dropStreamEngine("reactiveDemo2")
} catch (exp) {
}
rse = createReactiveStateEngine(
          name="reactiveDemo",
          metrics =metrics,
          dummyTable=inputTable,
          outputTable=result,
          keyColumn="sym")
rse2 = createReactiveStateEngine(
          name="reactiveDemo2",
          metrics =[<cumsum(val)>],
          dummyTable=inputTable,
          outputTable=result2,
          keyColumn="sym")
t = table(rand(`A`B`C, 100) as sym, rand(100.0, 100) as val)
rse.append!(t)
rse2.append!(t)
t1 = select TOP 10 * from result ORDER BY sym
t2 = select TOP 10 * from result2 ORDER BY symtable t1:
| sym | res | 
|---|---|
| A | 73.74901322182268 | 
| A | 151.10119895543903 | 
| A | 234.97340406756848 | 
| A | 249.01866489090025 | 
| A | 263.8336496660486 | 
| A | 277.98473422881216 | 
| A | 321.98982320260257 | 
| A | 394.077792391181 | 
| A | 442.3658183310181 | 
| A | 538.974173902534 | 
table t2:
| sym | res | 
|---|---|
| A | 73.74901322182268 | 
| A | 151.10119895543903 | 
| A | 234.97340406756848 | 
| A | 249.01866489090025 | 
| A | 263.8336496660486 | 
| A | 277.98473422881216 | 
| A | 321.98982320260257 | 
| A | 394.077792391181 | 
| A | 442.3658183310181 | 
| A | 538.974173902534 | 
Defining events in CEP engine
Using OOP to construct event types can replace the complex construction using dictionary forms. This approach makes the definition and processing logic of events more intuitive, further improving the efficiency of developers in writing CEP applications.
class orders{
    trader :: STRING
    market :: STRING
    code :: STRING
    price :: DOUBLE
    qty :: INT
    eventTime :: TIMESTAMP
    def orders(t, m, c, p, q){
        trader = t
        market = m
        code = c
        price = p
        qty = q
        eventTime = now()
    }
}
class traderOnline{
    trader :: STRING
    eventTime :: TIMESTAMP
    def traderOnline(t){
        trader = t
        eventTime = now()
    }
}
class mainMonitor:CEPMonitor{
    ordersTable :: ANY
    isBusy :: BOOL
    def mainMonitor(busyFlag){
        ordersTable = array(ANY, 0)
        isBusy = busyFlag
    }
    def updateOrders(event)
    def unOnload(){
        undef('traderDV', SHARED)
    }
    def online(online){
        getDataViewEngine(,'traderDV').append!(table(online.trader as trader, 0 as orderCount, 0 as tradeCount, 0.0 as price, false as busy, date(online.eventTime) as orderDate))
    }
    def onload(){
        addEventListener(online,'traderOnline',,'all') 
        addEventListener(updateOrders, 'orders',,'all') 
        traderDV = streamTable(array(STRING, 0) as trader,  array(INT, 0) as orderCount, array(INT, 0) as tradeCount, array(DOUBLE, 0) as price, array(BOOL, 0) as busy, array(DATE, 0) as orderDate, array(TIMESTAMP, 0) as updateTime)
        share(traderDV, 'traderDV')
        createDataViewEngine('traderDV', objByName('traderDV'), `trader, `updateTime, true)
    }
    def updateOrders(event) {
        ordersTable.append!([event.trader, event.market, event.code, event.price, event.qty])
        updateDataViewItems('traderDV', event.trader, ['orderCount', 'busy'], [ordersTable.size(), false])
    }
}
dummy = table(array(TIMESTAMP, 0) as eventTime, array(STRING, 0) as eventType, array(BLOB, 0) as blobs)
engineCep = createCEPEngine('cep1', <mainMonitor(true)>, dummy, [orders, traderOnline], 1, 'eventTime', 10000)
traderOnline = traderOnline('t1')
appendEvent(engineCep, traderOnline)
traderOnline = traderOnline('t2')
appendEvent(engineCep, traderOnline)
traderOnline = traderOnline('t3')
appendEvent(engineCep, traderOnline)
orders1 = orders('t1', 'sz', 's001', 11.0, 10)
appendEvent(engineCep, orders1)
objByName(`traderDV)
getCEPEngineStat(`cep1)
/* output:
streamEngineStat->{
}
subEngineStat->
subEngineName eventsOnInp...monitorNumber listeners timers eventsRouted eventsSent eventsReceived...
------------- ------------- ---------------- --------- ------ ------------ ---------- ------------- 
cep1          4             1             2         0      0            0          4             ...
engineStat->{
eventsOnOutputQueue->0
lastErrorTimestamp->
numOfSubEngine->1
lastErrorMessage->
eventsEmitted->0
eventsReceived->4
name->cep1
queueDepth->10000
user->guest
useSystemTime->true
status->OK
}
eventSchema->
eventType    eventField                fieldType                 fieldTypeId        fieldFormId  
------------ ------------------------- ---------------------------- --------------------- -------------
orders       trader,market,code,pric...STRING,STRING,STRING,DO...[18,18,18,16,4,12] [0,0,0,0,0,0]
traderOnline trader,eventTime          STRING,TIMESTAMP          [18,12]            [0,0]        
dataViewEngines->
name     user  status lastErrorMes...lastErrorTim...keyColumns outputTableNameuseSystemTime throttle ...
-------- ----- ------ -------------- ----------------- ------------- -------------- ------------- -------- 
traderDV guest OK                                   trader     traderDV       true                   ...
*/