Narrow Reactive State Engine
As competition in quantitative trading intensifies, quantitative investment teams need to process a vast number of factors. In many cases, the amount of factor data far exceeds that of high-frequency market data. In scenarios involving massive factor data storage, narrow tables provide more efficient operations for adding, updating, and deleting factors than wide tables. Therefore, narrow tables are more suitable for storing factor data. Built on the reactive state engine, DolphinDB's narrow reactive state engine supports outputting results to narrow tables, meeting the needs for factor data storage. In addition, the narrow reactive state engine supports dynamic management of calculation metrics. You can use the getReactiveMetrics function to retrieve factors specified in metrics in real time and the addReactiveMetrics function to add new metrics dynamically without interrupting your business, greatly enhancing flexibility.
Calculation Rules
As with the reactive state engine, the narrow reactive state engine follows the same calculation rules: each input of data triggers one output. However, unlike the reactive state engine, the narrow reactive state engine returns a table in narrow format: it writes the calculation results of the metrics specified by metricNames to the same columns in different rows.
- The narrow reactive state engine is created via
createNarrowReactiveStateEngine.The syntax is as follows:
createNarrowReactiveStateEngine(name, metrics, metricNames, dummyTable, outputTable, keyColumn, [filter], [snapshotDir], [snapshotIntervalInMsgCount], [keepOrder], [keyPurgeFilter], [keyPurgeFreqInSecond=0], [raftGroup], [outputElapsedMicroseconds=false], [keyCapacity=1024], [parallelism=1], [outputHandler=NULL], [msgAsTable=false])See createNarrowReactiveStateEngine for details.
- The metrics can be retrieved via
getReactiveMetrics.The syntax is as follows:
getReactiveMetrics(name)See getReactiveMetrics for details.
- To dynamically add calculation metrics to the narrow reactive state engine, you
can use the
addReactiveMetricsfunction.The syntax is as follows:
addReactiveMetrics(name, metricNames, metrics)See addReactiveMetrics for details.
Usage Examples
Example 1. Dynamically Add Calculation Metrics
Business scenarios often require you to dynamically add calculation metrics. The reactive state engine generally requires the calculation logic to be fully determined during initialization. In contrast, the narrow reactive state engine allows you to add metrics dynamically. After you update the metrics, the changes take effect for subsequent data streams without requiring an engine restart.
First, use the narrow reactive state engine to calculate the three metrics: cumulative average price, cumulative highest price, and cumulative trading volume. The calculation code is as follows:
dummy = streamTable(1:0, `sym`date`time`price`qty, [STRING,DATE,TIME,DOUBLE, INT])
outputTable = streamTable(10000:0, `sym`date`time`metricNames`factorValue, [STRING,DATE,TIME,STRING,DOUBLE])
// Define the calculation metrics. In this example, three metrics are defined: cumulative average price, cumulative highest price, and cumulative trading volume. You can modify the metrics based on your needs.
factor = [<date>, <time>, <cumavg(price)>, <cummax(price)>, <cumsum(qty)>]
Narrowtest = createNarrowReactiveStateEngine(name="narrowtest", metrics=factor, metricNames=["cumAvgPx","cumMaxPx","cumTotalQty"],dummyTable=dummy,outputTable=outputTable,keyColumn="sym")
// Mock data
num = 5
tmp = table(take("A" + lpad(string(1..4),4,"0"),num) as sym, take(2023.09.01,num) as date, 2023.09.01 00:00:00+take(1..num, num).sort() as time, take(rand(100.0,num) join take(int(),30),num) as price, rand(50,num) as qty)
Narrowtest.append!(tmp)
The example above uses the narrow reactive state engine to calculate the three metrics: cumulative average price, cumulative highest price, and cumulative trading volume, and outputs the results to a narrow table, with the results of the three metrics output sequentially to the same column. An example of the results is shown below:
| sym | date | time | metricNames | factorValue |
|---|---|---|---|---|
| A0001 | 2023.09.01 | 09:30:01.000 | cumAvgPx | 90.83 |
| A0001 | 2023.09.01 | 09:30:01.000 | cumMaxPx | 90.83 |
| A0001 | 2023.09.01 | 09:30:01.000 | cumTotalQty | 802 |
| A0002 | 2023.09.01 | 09:30:02.000 | cumAvgPx | 7.88 |
| A0002 | 2023.09.01 | 09:30:02.000 | cumMaxPx | 7.88 |
| A0002 | 2023.09.01 | 09:30:02.000 | cumTotalQty | 448 |
| A0003 | 2023.09.01 | 09:30:03.000 | cumAvgPx | 67.40 |
| A0003 | 2023.09.01 | 09:30:03.000 | cumMaxPx | 67.40 |
| A0003 | 2023.09.01 | 09:30:03.000 | cumTotalQty | 80 |
| A0004 | 2023.09.01 | 09:30:04.000 | cumAvgPx | 32.14 |
| A0004 | 2023.09.01 | 09:30:04.000 | cumMaxPx | 32.14 |
| A0004 | 2023.09.01 | 09:30:04.000 | cumTotalQty | 11 |
| A0001 | 2023.09.01 | 09:30:05.000 | cumAvgPx | 86.33 |
| A0001 | 2023.09.01 | 09:30:05.000 | cumMaxPx | 90.83 |
| A0001 | 2023.09.01 | 09:30:05.000 | cumTotalQty | 1,351 |
The following example adds a cumulative maximum trading volume metric:
// Add cumulative maximum trading volume metric.
metrics = [<cummax(qty)>]
addReactiveMetrics("narrowtest", "cumMaxQty", metrics)
// After adding the new metric, append data into the engine to output the updated metric calculation results.
tmpNew = table("A0001" as sym, 2023.09.01 as date, 2023.09.01 00:00:06 as time, 88 as price, 666 as qty)
Narrowtest.append!(tmpNew)
After adding themetric, the engine immediately applies it to subsequent data. The output:
| sym | date | time | metricNames | factorValue |
|---|---|---|---|---|
| A0001 | 2023.09.01 | 09:30:06.000 | cumAvgPx | 86.88 |
| A0001 | 2023.09.01 | 09:30:06.000 | cumMaxPx | 90.83 |
| A0001 | 2023.09.01 | 09:30:06.000 | cumTotalQty | 2,017 |
| A0001 | 2023.09.01 | 09:30:06.000 | cumMaxQty | 666 |
Example 2. Multi-Factor Narrow Table Calculation Based on Engine Cascade
DolphinDB's built-in streaming engines include the reactive state engine, time-series engine, cross-sectional engine, and anomaly detection engine, etc. All of these engines take tables as input and output, enabling the cascade of multiple streaming engines into a complex data-flow topology for complex factor calculations. The narrow reactive state engine is suitable for downstream processing scenarios that rely on multi-factor narrow tables. For example, when downstream factor calculations depend on the latest values of multiple factors.
First, use the narrow reactive state engine to calculate the cumulative average price and cumulative trading volume for each product. Then, use the reactive stateless engine to adjust the cumulative average price for a product.
/**
@ If you have executed the code of example 1, use the following code to clean up the environment:
dropStreamEngine("narrowtest")
*/
share streamTable(1:0, `sym`date`time`price`qty, [STRING,DATE,TIME,DOUBLE, INT]) as tickStream
share streamTable(1:0, `sym`metricNames`factorValue, [STRING,STRING,DOUBLE]) as tempResultStream
result = table(100:0, `sym`metricNames`factorValue, [STRING,STRING,DOUBLE])
// Create metrics to describe the dependencies among the data
metrics = array(ANY, 0, 0)
metric = dict(STRING,ANY)
// Dependencies: product_A:factorNew=product_A:factor1*product_A:factor2/(product_A:factor2+product_B:factor2)
metric["outputName"] = `product_A:`factorNew
metric["formula"] = <A*B\(B+C)>
metric["A"] = `product_A:`factor1
metric["B"] = `product_A:`factor2
metric["C"] = `product_B:`factor2
metrics.append!(metric)
rsle = createReactiveStatelessEngine("reactiveStateless", metrics, result)
// Define calculation metrics. In this example, we define cumulative average price and cumulative trading volume. You can modify the metrics as needed.
factor = [<cumavg(price)>, <cumsum(qty)>]
Narrowtest = createNarrowReactiveStateEngine(name="narrowtest", metrics=factor, metricNames=["factor1","factor2"],dummyTable=tickStream,outputTable=tempResultStream,keyColumn="sym")
subscribeTable(tableName=`tickStream, actionName="factors", handler=tableInsert{Narrowtest})
subscribeTable(tableName=`tempResultStream, actionName="factorsNew", handler=tableInsert{rsle})
// Mock data
num = 10
tmp = table(take(`product_A`product_B, num) as sym, take(2023.09.01, num) as date, 09:30:00+(1..num) as time, rand(10.0, num) as price, rand(1000, num) as qty)
tickStream.append!(tmp)
The output:
| sym | metricNames | factorValue |
|---|---|---|
| product_A | factorNew | 7.938898 |
| product_A | factorNew | 2.399637 |
| product_A | factorNew | 2.710666 |
| product_A | factorNew | 2.440562 |
