Use Cases
Developing operators for reactive state engine
The OOP paradigm simplifies the development process of operators. Traditional methods
require complex higher-order functions such as genericIterateState
or plugins in C++. Introducing OOP makes the operator code more intuitive and
understandable.
class MyCumSum {
sum :: DOUBLE
def MyCumSum() {
sum = 0.0
}
def append(value) {
sum = sum + value
return sum
}
}
metrics = [<MyCumSum().append(val)>]
inputTable = table(1:0, `sym`val, [SYMBOL, DOUBLE])
result = table(1000:0, `sym`res, [SYMBOL, DOUBLE])
rse = createReactiveStateEngine(
name="reactiveDemo",
metrics =metrics,
dummyTable=inputTable,
outputTable=result,
keyColumn="sym")
rse.append!(table(rand(`A`B`C, 100) as sym, rand(100.0, 100) as val))
select TOP 10 * from result
Returns:
sym | res |
---|---|
B | 23.136844485998154 |
C | 97.25657706148922 |
C | 119.75046650040895 |
B | 28.15541410818696 |
C | 208.6210786132142 |
A | 17.531023011542857 |
B | 127.66627178061754 |
C | 286.6357475752011 |
A | 45.87050362024456 |
C | 343.2471259729937 |
Parameters can be specified for the constructor of a class, which can only be constants. For example:
dropStreamEngine("reactiveDemo")
class MyCumSum {
sum :: DOUBLE
def MyCumSum(initialValue) {
sum = initialValue
}
def append(value) {
sum = sum + value
return sum
}
}
metrics= [<MyCumSum(0.0).append(val)>]
inputTable = table(1:0, `sym`val, [SYMBOL, DOUBLE])
result = table(1000:0, `sym`res, [SYMBOL, DOUBLE])
rse = createReactiveStateEngine(
name="reactiveDemo",
metrics =metrics,
dummyTable=inputTable,
outputTable=result,
keyColumn="sym")
rse.append!(table(rand(`A`B`C, 100) as sym, rand(100.0, 100) as val))
select TOP 10 * from result
Returns:
sym | res |
---|---|
C | 79.73966575227678 |
A | 50.2432547044009 |
C | 108.56282587628812 |
A | 137.95565224718302 |
C | 179.22195203136653 |
B | 41.10089084133506 |
C | 253.5685545997694 |
A | 173.4938955400139 |
C | 262.32025355566293 |
B | 105.45554195996374 |
Parameters of append
can be constants or lambda expressions.
class MyCumSum {
sum :: DOUBLE
def MyCumSum() {
g = def (): 0
sum = g()
}
def append(value, f) {
sum = f(sum, value)
return sum
}
}
metrics = [<MyCumSum().append(val, def (a, b): a + b)>]
inputTable = table(1:0, `sym`val, [SYMBOL, DOUBLE])
result = table(1000:0, `sym`res, [SYMBOL, DOUBLE])
result2 = table(1000:0, `sym`res, [SYMBOL, DOUBLE])
try {
dropStreamEngine("reactiveDemo")
dropStreamEngine("reactiveDemo2")
} catch (exp) {
}
rse = createReactiveStateEngine(
name="reactiveDemo",
metrics =metrics,
dummyTable=inputTable,
outputTable=result,
keyColumn="sym")
rse2 = createReactiveStateEngine(
name="reactiveDemo2",
metrics =[<cumsum(val)>],
dummyTable=inputTable,
outputTable=result2,
keyColumn="sym")
t = table(rand(`A`B`C, 100) as sym, rand(100.0, 100) as val)
rse.append!(t)
rse2.append!(t)
t1 = select TOP 10 * from result ORDER BY sym
t2 = select TOP 10 * from result2 ORDER BY sym
table t1:
sym | res |
---|---|
A | 73.74901322182268 |
A | 151.10119895543903 |
A | 234.97340406756848 |
A | 249.01866489090025 |
A | 263.8336496660486 |
A | 277.98473422881216 |
A | 321.98982320260257 |
A | 394.077792391181 |
A | 442.3658183310181 |
A | 538.974173902534 |
table t2:
sym | res |
---|---|
A | 73.74901322182268 |
A | 151.10119895543903 |
A | 234.97340406756848 |
A | 249.01866489090025 |
A | 263.8336496660486 |
A | 277.98473422881216 |
A | 321.98982320260257 |
A | 394.077792391181 |
A | 442.3658183310181 |
A | 538.974173902534 |
Defining events in CEP engine
Using OOP to construct event types can replace the complex construction using dictionary forms. This approach makes the definition and processing logic of events more intuitive, further improving the efficiency of developers in writing CEP applications.
class orders{
trader :: STRING
market :: STRING
code :: STRING
price :: DOUBLE
qty :: INT
eventTime :: TIMESTAMP
def orders(t, m, c, p, q){
trader = t
market = m
code = c
price = p
qty = q
eventTime = now()
}
}
class traderOnline{
trader :: STRING
eventTime :: TIMESTAMP
def traderOnline(t){
trader = t
eventTime = now()
}
}
class mainMonitor{
ordersTable :: ANY
isBusy :: BOOL
def mainMonitor(busyFlag){
ordersTable = array(ANY, 0)
isBusy = busyFlag
}
def updateOrders(event)
def unOnload(){
undef('traderDV', SHARED)
}
def online(online){
getDataViewEngine(,'traderDV').append!(table(online.trader as trader, 0 as orderCount, 0 as tradeCount, 0.0 as price, false as busy, date(online.eventTime) as orderDate))
}
def onload(){
addEventListener(online,'traderOnline',,'all')
addEventListener(updateOrders, 'orders',,'all')
traderDV = streamTable(array(STRING, 0) as trader, array(INT, 0) as orderCount, array(INT, 0) as tradeCount, array(DOUBLE, 0) as price, array(BOOL, 0) as busy, array(DATE, 0) as orderDate, array(TIMESTAMP, 0) as updateTime)
share(traderDV, 'traderDV')
createDataViewEngine('traderDV', objByName('traderDV'), `trader, `updateTime, true)
}
def updateOrders(event) {
ordersTable.append!([event.trader, event.market, event.code, event.price, event.qty])
updateDataViewItems('traderDV', event.trader, ['orderCount', 'busy'], [ordersTable.size(), false])
}
}
dummy = table(array(TIMESTAMP, 0) as eventTime, array(STRING, 0) as eventType, array(BLOB, 0) as blobs)
engineCep = createCEPEngine('cep1', <mainMonitor(true)>, dummy, [orders, traderOnline], 1, 'eventTime', 10000)
traderOnline = traderOnline('t1')
appendEvent(engineCep, traderOnline)
traderOnline = traderOnline('t2')
appendEvent(engineCep, traderOnline)
traderOnline = traderOnline('t3')
appendEvent(engineCep, traderOnline)
orders1 = orders('t1', 'sz', 's001', 11.0, 10)
appendEvent(engineCep, orders1)
objByName(`traderDV)
getCEPEngineStat(`cep1)
/* output:
streamEngineStat->{
}
subEngineStat->
subEngineName eventsOnInp...monitorNumber listeners timers eventsRouted eventsSent eventsReceived...
------------- ------------- ---------------- --------- ------ ------------ ---------- -------------
cep1 4 1 2 0 0 0 4 ...
engineStat->{
eventsOnOutputQueue->0
lastErrorTimestamp->
numOfSubEngine->1
lastErrorMessage->
eventsEmitted->0
eventsReceived->4
name->cep1
queueDepth->10000
user->guest
useSystemTime->true
status->OK
}
eventSchema->
eventType eventField fieldType fieldTypeId fieldFormId
------------ ------------------------- ---------------------------- --------------------- -------------
orders trader,market,code,pric...STRING,STRING,STRING,DO...[18,18,18,16,4,12] [0,0,0,0,0,0]
traderOnline trader,eventTime STRING,TIMESTAMP [18,12] [0,0]
dataViewEngines->
name user status lastErrorMes...lastErrorTim...keyColumns outputTableNameuseSystemTime throttle ...
-------- ----- ------ -------------- ----------------- ------------- -------------- ------------- --------
traderDV guest OK trader traderDV true ...
*/