Dual-Ownership Reactive State Engine
The dual-ownership reactive state engine extends the reactive state engine, sharing the same calculation rules and triggering mechanism. The difference is that the dual-ownership reactive state engine allows you to define two grouping rules (i.e., two grouping columns) for the same input data stream. The engine creates two independent calculation instances to process the corresponding set of metrics for each grouping in parallel on the same data. Each input still corresponds to a single output, with the metrics for different groupings stored in separate fields.
In real financial scenarios, you often need to apply different metrics for calculations based on various grouping methods. Compared with solutions achieved by cascading reactive state engines, the dual-ownership reactive state engine provides a more concise and efficient approach.
The dual-ownership reactive state engine is created via
createDualOwnershipReactiveStateEngine. The syntax is as follows:
createDualOwnershipReactiveStateEngine(name, metrics1, metrics2, dummyTable, outputTable, keyColumn1, keyColumn2, [snapshotDir], [snapshotIntervalInMsgCount], [keyPurgeFilter1], [keyPurgeFilter2], [keyPurgeFreqInSecond=0], [raftGroup], [outputHandler], [msgAsTable=false])
Parameters of the dual-ownership reactive state engine are mostly consistent with
createReactiveStateEngine. See createDualOwnershipReactiveStateEngine for details.
Calculation Rules
A result is output whenever data is inserted. For data grouped by keyColumn1, calculations are based on metrics1; for data grouped by keyColumn2, calculations are based on metrics2. Metrics for different groups are stored in separate fields.
Usage Examples
Calculate C umulative Trade Value Based on Tick-by-Tick Trades in Real Time
In real financial scenarios, you often need to aggregate and calculate metrics based on tick-by-tick trades separately by buy order number and sell order number. This example aggregates trade values in tick-by-tick data by buy and sell order numbers, calculating the corresponding cumulative trading values.
// Define the table schema
colName = `SecurityID`TradeTime`TradePrice`TradeQty`TradeAmount`BuyNum`SellNum`TradeBSFlag
colType = [SYMBOL, TIMESTAMP, DOUBLE, LONG, DOUBLE, LONG, LONG, SYMBOL]
share(streamTable(1000:0, colName, colType), `tickStream)
go
colName = `SecurityID`BuyNum`SellNum`TradeTime`BuyTotalAmount`SellTotalAmount
colType = [SYMBOL, LONG, LONG, TIMESTAMP, DOUBLE, DOUBLE]
share(streamTable(1000:0, colName, colType), `processOrder)
go
// Define the engine
dors = createDualOwnershipReactiveStateEngine(
name="test",
metrics1=[<TradeTime>, <cumsum(TradeAmount)>],
metrics2=<cumsum(TradeAmount)>,
dummyTable=tickStream,
outputTable=processOrder,
keyColumn1=`SecurityID`BuyNum,
keyColumn2=`SecurityID`SellNum
)
// Subscribe to the stream table
subscribeTable(tableName=`tickStream, actionName="test", msgAsTable=true, handler=tableInsert{dors})
// Create a function to generate mock data
def generateMockTradeData(n, startTime) {
// Generate basic data
securities = `000001.SZ`000002.SZ`600000.SH`600036.SH
sides = `B`S
// Generate random trade data
securityList = take(securities, n)
timeList = startTime + 1..n * 1000 // One data per second
priceList = 10.0 + rand(100.0, n) // Price between 10 and 110
qtyList = rand(1000, n) + 100 // Quantity between 100 and 1100 shares
amountList = priceList * qtyList // Calculate trade amount
buyNumList = rand(1000..10000, n) // Buy order number
sellNumList = rand(1000..10000, n) // Sell order number
bsFlagList = take(sides, n) // Buy/Sell flag
return table(
securityList as SecurityID,
timeList as TradeTime,
priceList as TradePrice,
qtyList as TradeQty,
amountList as TradeAmount,
buyNumList as BuyNum,
sellNumList as SellNum,
bsFlagList as TradeBSFlag
)
}
// Generate and append mock data
mockData = generateMockTradeData(10, 2023.02.01T09:30:00.000)
tickStream.append!(mockData)
// Query the result
select * from processOrder
The output:
| SecurityID | BuyNum | SellNum | TradeTime | BuyTotalAmount | SellTotalAmount |
|---|---|---|---|---|---|
| 000001.SZ | 9,183 | 7,233 | 2023.02.01 09:30:01.000 | 24,564.96 | 24,564.96 |
| 000002.SZ | 4,536 | 7,949 | 2023.02.01 09:30:02.000 | 6,315.87 | 6,315.87 |
| 600000.SH | 4,132 | 8,528 | 2023.02.01 09:30:03.000 | 7,361.26 | 7,361.26 |
| 600036.SH | 4,890 | 6,103 | 2023.02.01 09:30:04.000 | 60,081.05 | 60,081.05 |
| 000001.SZ | 4,109 | 9,144 | 2023.02.01 09:30:05.000 | 6,610.39 | 6,610.39 |
| 000002.SZ | 9,145 | 5,634 | 2023.02.01 09:30:06.000 | 48,972.04 | 48,972.04 |
| 600000.SH | 8,980 | 5,341 | 2023.02.01 09:30:07.000 | 35,359.49 | 35,359.49 |
| 600036.SH | 7,919 | 6,908 | 2023.02.01 09:30:08.000 | 15,604.20 | 15,604.20 |
| 000001.SZ | 6,827 | 3,572 | 2023.02.01 09:30:09.000 | 13,989.02 | 13,989.02 |
| 000002.SZ | 1,416 | 5,907 | 2023.02.01 09:30:10.000 | 80,293.77 | 80,293.77 |
