createDualOwnershipReactiveStateEngine

Syntax

createDualOwnershipReactiveStateEngine(name, metrics1, metrics2, dummyTable, outputTable, keyColumn1, keyColumn2, [snapshotDir], [snapshotIntervalInMsgCount], [keyPurgeFilter1], [keyPurgeFilter2], [keyPurgeFreqInSecond=0], [raftGroup])

Argument

Only the parameters of createDualOwnershipReactiveStateEngine and createReactiveStateEngine with different usages are explained here.

For data grouped by keyColumn1, they are calculated based on metrics1 and purged based on keyPurgeFilter1.

For data grouped by keyColumn2, they are calculated based on metrics2 and purged based on keyPurgeFilter2.

outputTable the output table. It can be an in-memory table or a DFS table. Please create an empty table and specify the names and data types of the columns before calling the function.

The output columns are in the following order:

(1) The common columns of keyColumn1 and keyColumn2

(2) Other columns of keyColumn1 and keyColumn2

(3) The result columns of metrics1 and metrics2.

Details

The dual-ownership reactive state streaming engine extends the functionality of the reactive state streaming engine with a support of parallel computing on 2 groups with different metrics of a stream table. Compared to the cascade of reactive state streaming engines, this function has greatly improved the computing performance.

Note: The output table is sorted in the same order as the input, i.e., keepOrder is set to true in the engine.

Examples

$ share streamTable(1:0, `date`time`sym`market`price`qty, [DATE, TIME, SYMBOL, CHAR, DOUBLE, INT]) as trades
$ outputTable = table(100:0, `date`sym`market`factor1`factor2, [DATE, SYMBOL, CHAR, DOUBLE, DOUBLE])
$ dors = createDualOwnershipReactiveStateEngine(name="test", metrics1=<mfirst(price, 3)>, metrics2=<mmax(price, 3)>, dummyTable=trades, outputTable=outputTable, keyColumn1=`date`sym, keyColumn2=`date`market)
$ tmp = table(1:0, `date`time`sym`market`price`qty, [DATE, TIME, SYMBOL, CHAR, DOUBLE, INT])
$ subscribeTable(tableName=`trades, actionName="test",msgAsTable=true, handler=tableInsert{dors})
$ insert into tmp values(2012.01.01, 09:00:00.030, `a, 'B', 10.65, 1500)
$ insert into tmp values(2012.01.01, 09:00:00.030, `a, 'B', 10.59, 2500)
$ insert into tmp values(2012.01.01, 09:00:00.031, `b, 'A', 10.59, 2500)
$ insert into tmp values(2012.01.01, 09:00:00.031, `a, 'B', 10.65, 1500)
$ insert into tmp values(2012.01.01, 09:00:00.031, `a, 'A', 10.59, 2500)
$ insert into tmp values(2012.01.01, 09:00:00.033, `b, 'B', 10.59, 2500)
$ insert into tmp values(2012.01.01, 09:00:00.033, `a, 'A', 10.59, 2500)
$ insert into tmp values(2012.01.01, 09:00:00.034, `b, 'A', 10.59, 2500)
$ insert into tmp values(2012.01.01, 09:00:00.034, `b, 'A', 10.22, 1200)
$ insert into tmp values(2012.01.01, 09:00:00.035, `a, 'A', 11.0, 2500)
$ insert into tmp values(2012.01.02, 09:00:00.031, `b, 'A', 10.22, 1200)
$ insert into tmp values(2012.01.02, 09:00:00.032, `a, 'B', 11.0, 2500)
$ insert into tmp values(2012.01.02, 09:00:00.032, `b, 'B', 15.6, 1300)
$ insert into tmp values(2012.01.02, 09:00:00.040, `c, 'B', 13.2, 2000)
$ trades.append!(tmp)
$ select * from outputTable

date

sym

market

factor1

factor2

2012.01.01

a

‘B’

2012.01.01

a

‘B’

2012.01.01

b

‘A’

2012.01.01

a

‘B’

10.65

10.65

2012.01.01

a

‘A’

10.59

2012.01.01

b

‘B’

10.65

2012.01.01

a

‘A’

10.65

10.59

2012.01.01

b

‘A’

10.59

10.59

2012.01.01

b

‘A’

10.59

10.59

2012.01.01

a

‘A’

10.59

11

2012.01.02

b

‘A’

2012.01.02

a

‘B’

2012.01.02

b

‘B’

2012.01.02

c

‘B’

15.6

$ unsubscribeTable(tableName=`trades, actionName="dors")
$ undef(`trades,SHARED)
$ dropStreamEngine("dors")