Dual-Ownership Reactive State Engine

The dual-ownership reactive state engine extends the reactive state engine, sharing the same calculation rules and triggering mechanism. The difference is that the dual-ownership reactive state engine allows you to define two grouping rules (i.e., two grouping columns) for the same input data stream. The engine creates two independent calculation instances to process the corresponding set of metrics for each grouping in parallel on the same data. Each input still corresponds to a single output, with the metrics for different groupings stored in separate fields.

In real financial scenarios, you often need to apply different metrics for calculations based on various grouping methods. Compared with solutions achieved by cascading reactive state engines, the dual-ownership reactive state engine provides a more concise and efficient approach.

The dual-ownership reactive state engine is created via createDualOwnershipReactiveStateEngine. The syntax is as follows:

createDualOwnershipReactiveStateEngine(name, metrics1, metrics2, dummyTable, outputTable, keyColumn1, keyColumn2, [snapshotDir], [snapshotIntervalInMsgCount], [keyPurgeFilter1], [keyPurgeFilter2], [keyPurgeFreqInSecond=0], [raftGroup], [outputHandler], [msgAsTable=false])

Parameters of the dual-ownership reactive state engine are mostly consistent with createReactiveStateEngine. See createDualOwnershipReactiveStateEngine for details.

Calculation Rules

A result is output whenever data is inserted. For data grouped by keyColumn1, calculations are based on metrics1; for data grouped by keyColumn2, calculations are based on metrics2. Metrics for different groups are stored in separate fields.

Usage Examples

Calculate C umulative Trade Value Based on Tick-by-Tick Trades in Real Time

In real financial scenarios, you often need to aggregate and calculate metrics based on tick-by-tick trades separately by buy order number and sell order number. This example aggregates trade values in tick-by-tick data by buy and sell order numbers, calculating the corresponding cumulative trading values.

// Define the table schema
colName = `SecurityID`TradeTime`TradePrice`TradeQty`TradeAmount`BuyNum`SellNum`TradeBSFlag
colType = [SYMBOL, TIMESTAMP, DOUBLE, LONG, DOUBLE, LONG, LONG, SYMBOL]
share(streamTable(1000:0, colName, colType), `tickStream)
go
colName = `SecurityID`BuyNum`SellNum`TradeTime`BuyTotalAmount`SellTotalAmount
colType = [SYMBOL, LONG, LONG, TIMESTAMP, DOUBLE, DOUBLE]
share(streamTable(1000:0, colName, colType), `processOrder)
go
// Define the engine
dors = createDualOwnershipReactiveStateEngine(
    name="test", 
    metrics1=[<TradeTime>, <cumsum(TradeAmount)>], 
    metrics2=<cumsum(TradeAmount)>, 
    dummyTable=tickStream, 
    outputTable=processOrder, 
    keyColumn1=`SecurityID`BuyNum, 
    keyColumn2=`SecurityID`SellNum
)
// Subscribe to the stream table
subscribeTable(tableName=`tickStream, actionName="test", msgAsTable=true, handler=tableInsert{dors})
// Create a function to generate mock data
def generateMockTradeData(n, startTime) {
    // Generate basic data
    securities = `000001.SZ`000002.SZ`600000.SH`600036.SH
    sides = `B`S
    // Generate random trade data
    securityList = take(securities, n)
    timeList = startTime + 1..n * 1000  // One data per second
    priceList = 10.0 + rand(100.0, n)  // Price between 10 and 110
    qtyList = rand(1000, n) + 100  // Quantity between 100 and 1100 shares
    amountList = priceList * qtyList  // Calculate trade amount
    buyNumList = rand(1000..10000, n)  // Buy order number
    sellNumList = rand(1000..10000, n)  // Sell order number
    bsFlagList = take(sides, n)  // Buy/Sell flag
    return table(
        securityList as SecurityID,
        timeList as TradeTime,
        priceList as TradePrice,
        qtyList as TradeQty,
        amountList as TradeAmount,
        buyNumList as BuyNum,
        sellNumList as SellNum,
        bsFlagList as TradeBSFlag
    )
}
// Generate and append mock data
mockData = generateMockTradeData(10, 2023.02.01T09:30:00.000)
tickStream.append!(mockData)
// Query the result
select * from processOrder

The output:

SecurityID BuyNum SellNum TradeTime BuyTotalAmount SellTotalAmount
000001.SZ 9,183 7,233 2023.02.01 09:30:01.000 24,564.96 24,564.96
000002.SZ 4,536 7,949 2023.02.01 09:30:02.000 6,315.87 6,315.87
600000.SH 4,132 8,528 2023.02.01 09:30:03.000 7,361.26 7,361.26
600036.SH 4,890 6,103 2023.02.01 09:30:04.000 60,081.05 60,081.05
000001.SZ 4,109 9,144 2023.02.01 09:30:05.000 6,610.39 6,610.39
000002.SZ 9,145 5,634 2023.02.01 09:30:06.000 48,972.04 48,972.04
600000.SH 8,980 5,341 2023.02.01 09:30:07.000 35,359.49 35,359.49
600036.SH 7,919 6,908 2023.02.01 09:30:08.000 15,604.20 15,604.20
000001.SZ 6,827 3,572 2023.02.01 09:30:09.000 13,989.02 13,989.02
000002.SZ 1,416 5,907 2023.02.01 09:30:10.000 80,293.77 80,293.77