DStream::dualOwnershipReactiveStateEngine

Syntax

DStream::dualOwnershipReactiveStateEngine(metrics1, metrics2, keyColumn1, keyColumn2, [keyPurgeFilter1], [keyPurgeFilter2], [keyPurgeFreqInSecond=0])

Arguments

Only the parameters of dualOwnershipReactiveStateEngine and reactiveStateEngine with different usages are explained here.

For data grouped by keyColumn1, they are calculated based on metrics1 and purged based on keyPurgeFilter1; For data grouped by keyColumn2, they are calculated based on metrics2 and purged based on keyPurgeFilter2.

Examples

if (!existsCatalog("orca")) {
	createCatalog("orca")
}
go
use catalog orca

// If a stream graph with the same name already exists, destroy it first.
// dropStreamGraph('dualOwnershipReactive')

g = createStreamGraph('dualOwnershipReactive')

g.source("trades", 1000:0, `date`time`sym`market`price`qty, [DATE, TIME, SYMBOL, CHAR, DOUBLE, INT])
.dualOwnershipReactiveStateEngine(metrics1=<mfirst(price, 3)>, metrics2=<mmax(price, 3)>, keyColumn1=`date`sym, keyColumn2=`date`market)
.sink("output")
g.submit()
go

dates=take(2012.01.01, 10) join take(2012.01.02, 4)
times=[09:00:00.030, 09:00:00.030, 09:00:00.031, 09:00:00.031, 09:00:00.031, 09:00:00.033, 09:00:00.033, 09:00:00.034, 09:00:00.034, 09:00:00.035, 09:00:00.031, 09:00:00.032, 09:00:00.032, 09:00:00.040]
syms=[`a, `a, `b, `a, `a, `b, `a, `b, `b, `a, `b, `a, `b, `c]
markets=['B', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'A', 'A', 'A', 'B', 'B', 'B']
prices=[10.65, 10.59, 10.59, 10.65, 10.59, 10.59, 10.59, 10.59, 10.22, 11.0, 10.22, 11.0, 15.6, 13.2]
qtys=[1500, 2500, 2500, 1500, 2500, 2500, 2500, 2500, 1200, 2500, 1200, 2500, 1300, 2000]
tmp=table(dates as date, times as time, syms as sym, markets as market, prices as price, qtys as qty)

appendOrcaStreamTable("trades", tmp)
select * from orca_table.output
date sym market mfirst_price mmax_price
2012.01.01 a B
2012.01.01 a B
2012.01.01 b A
2012.01.01 a B 10.65 10.65
2012.01.01 a A 10.59
2012.01.01 b B 10.65
2012.01.01 a A 10.65 10.59
2012.01.01 b A 10.59 10.59
2012.01.01 b A 10.59 10.59
2012.01.01 a A 10.59 11