createDualOwnershipReactiveStateEngine
Syntax
createDualOwnershipReactiveStateEngine(name, metrics1, metrics2, dummyTable,
outputTable, keyColumn1, keyColumn2, [snapshotDir],
[snapshotIntervalInMsgCount], [keyPurgeFilter1], [keyPurgeFilter2],
[keyPurgeFreqInSecond=0], [raftGroup], [outputHandler],
[msgAsTable=false])
Details
The dual-ownership reactive state streaming engine extends the functionality of the reactive state streaming engine with support for parallel computing on 2 groups with different metrics of a stream table. Compared to the cascade of reactive state streaming engines, this function has greatly improved the computing performance.
- The output table is sorted in the same order as the input, i.e., keepOrder is only set to true in the engine.
- Out-of-order processing is not supported. You must ensure that the data is ordered.
Argument
Only the parameters of createDualOwnershipReactiveStateEngine and
createReactiveStateEngine with different usages are explained here.
For data grouped by keyColumn1, they are calculated based on metrics1 and purged based on keyPurgeFilter1; For data grouped by keyColumn2, they are calculated based on metrics2 and purged based on keyPurgeFilter2.
outputTable specifies the output table. It can be an in-memory table or a DFS table. Create an empty table and specify the names and data types of the columns before calling the function.
The output columns are in the following order:
(1) The common columns of keyColumn1 and keyColumn2;
(2) Other columns of keyColumn1 and keyColumn2;
(3) The result columns of metrics1 and metrics2.
outputHandler (optional) is a unary function or a partial function with a single unfixed parameter. If set, the engine will not write the calculation results to the output table directly. Instead, the results will be passed as a parameter to the outputHandler function.
msgAsTable (optional) is a Boolean scalar indicating whether the output data is passed into function (specified by outputHandler) as a table or as a tuple. If msgAsTable=true, the subscribed data is passed into function as a table. The default value is false, which means the output data is passed into function as a tuple of columns.
Examples
share streamTable(1:0, `date`time`sym`market`price`qty, [DATE, TIME, SYMBOL, CHAR, DOUBLE, INT]) as trades
outputTable = table(100:0, `date`sym`market`factor1`factor2, [DATE, SYMBOL, CHAR, DOUBLE, DOUBLE])
dors = createDualOwnershipReactiveStateEngine(name="test", metrics1=<mfirst(price, 3)>, metrics2=<mmax(price, 3)>, dummyTable=trades, outputTable=outputTable, keyColumn1=`date`sym, keyColumn2=`date`market)
tmp = table(1:0, `date`time`sym`market`price`qty, [DATE, TIME, SYMBOL, CHAR, DOUBLE, INT])
subscribeTable(tableName=`trades, actionName="test",msgAsTable=true, handler=tableInsert{dors})
insert into tmp values(2012.01.01, 09:00:00.030, `a, 'B', 10.65, 1500)
insert into tmp values(2012.01.01, 09:00:00.030, `a, 'B', 10.59, 2500)
insert into tmp values(2012.01.01, 09:00:00.031, `b, 'A', 10.59, 2500)
insert into tmp values(2012.01.01, 09:00:00.031, `a, 'B', 10.65, 1500)
insert into tmp values(2012.01.01, 09:00:00.031, `a, 'A', 10.59, 2500)
insert into tmp values(2012.01.01, 09:00:00.033, `b, 'B', 10.59, 2500)
insert into tmp values(2012.01.01, 09:00:00.033, `a, 'A', 10.59, 2500)
insert into tmp values(2012.01.01, 09:00:00.034, `b, 'A', 10.59, 2500)
insert into tmp values(2012.01.01, 09:00:00.034, `b, 'A', 10.22, 1200)
insert into tmp values(2012.01.01, 09:00:00.035, `a, 'A', 11.0, 2500)
insert into tmp values(2012.01.02, 09:00:00.031, `b, 'A', 10.22, 1200)
insert into tmp values(2012.01.02, 09:00:00.032, `a, 'B', 11.0, 2500)
insert into tmp values(2012.01.02, 09:00:00.032, `b, 'B', 15.6, 1300)
insert into tmp values(2012.01.02, 09:00:00.040, `c, 'B', 13.2, 2000)
trades.append!(tmp)
select * from outputTable
| date | sym | market | factor1 | factor2 |
|---|---|---|---|---|
| 2012.01.01 | a | 'B' | ||
| 2012.01.01 | a | 'B' | ||
| 2012.01.01 | b | 'A' | ||
| 2012.01.01 | a | 'B' | 10.65 | 10.65 |
| 2012.01.01 | a | 'A' | 10.59 | |
| 2012.01.01 | b | 'B' | 10.65 | |
| 2012.01.01 | a | 'A' | 10.65 | 10.59 |
| 2012.01.01 | b | 'A' | 10.59 | 10.59 |
| 2012.01.01 | b | 'A' | 10.59 | 10.59 |
| 2012.01.01 | a | 'A' | 10.59 | 11 |
| 2012.01.02 | b | 'A' | ||
| 2012.01.02 | a | 'B' | ||
| 2012.01.02 | b | 'B' | ||
| 2012.01.02 | c | 'B' | 15.6 |
unsubscribeTable(tableName=`trades, actionName="dors")
undef(`trades,SHARED)
dropStreamEngine("dors")
