DStream::dualOwnershipReactiveStateEngine
Syntax
DStream::dualOwnershipReactiveStateEngine(metrics1, metrics2, keyColumn1,
keyColumn2, [keyPurgeFilter1], [keyPurgeFilter2],
[keyPurgeFreqInSecond=0])
Details
Creates a dual-ownership reactive state streaming engine. For details, see createDualOwnershipReactiveStateEngine.
Return value: A DStream object.
Arguments
Only the parameters of dualOwnershipReactiveStateEngine
and
reactiveStateEngine
with different usages are explained
here.
For data grouped by keyColumn1, they are calculated based on metrics1 and purged based on keyPurgeFilter1; For data grouped by keyColumn2, they are calculated based on metrics2 and purged based on keyPurgeFilter2.
Examples
if (!existsCatalog("orca")) {
createCatalog("orca")
}
go
use catalog orca
// If a stream graph with the same name already exists, destroy it first.
// dropStreamGraph('dualOwnershipReactive')
g = createStreamGraph('dualOwnershipReactive')
g.source("trades", 1000:0, `date`time`sym`market`price`qty, [DATE, TIME, SYMBOL, CHAR, DOUBLE, INT])
.dualOwnershipReactiveStateEngine(metrics1=<mfirst(price, 3)>, metrics2=<mmax(price, 3)>, keyColumn1=`date`sym, keyColumn2=`date`market)
.sink("output")
g.submit()
go
dates=take(2012.01.01, 10) join take(2012.01.02, 4)
times=[09:00:00.030, 09:00:00.030, 09:00:00.031, 09:00:00.031, 09:00:00.031, 09:00:00.033, 09:00:00.033, 09:00:00.034, 09:00:00.034, 09:00:00.035, 09:00:00.031, 09:00:00.032, 09:00:00.032, 09:00:00.040]
syms=[`a, `a, `b, `a, `a, `b, `a, `b, `b, `a, `b, `a, `b, `c]
markets=['B', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'A', 'A', 'A', 'B', 'B', 'B']
prices=[10.65, 10.59, 10.59, 10.65, 10.59, 10.59, 10.59, 10.59, 10.22, 11.0, 10.22, 11.0, 15.6, 13.2]
qtys=[1500, 2500, 2500, 1500, 2500, 2500, 2500, 2500, 1200, 2500, 1200, 2500, 1300, 2000]
tmp=table(dates as date, times as time, syms as sym, markets as market, prices as price, qtys as qty)
appendOrcaStreamTable("trades", tmp)
select * from orca_table.output
date | sym | market | mfirst_price | mmax_price |
---|---|---|---|---|
2012.01.01 | a | B | ||
2012.01.01 | a | B | ||
2012.01.01 | b | A | ||
2012.01.01 | a | B | 10.65 | 10.65 |
2012.01.01 | a | A | 10.59 | |
2012.01.01 | b | B | 10.65 | |
2012.01.01 | a | A | 10.65 | 10.59 |
2012.01.01 | b | A | 10.59 | 10.59 |
2012.01.01 | b | A | 10.59 | 10.59 |
2012.01.01 | a | A | 10.59 | 11 |