DStream::narrowReactiveStateEngin

Syntax

DStream::narrowReactiveStateEngine(metrics, metricNames, keyColumn, [filter], [keepOrder], [keyPurgeFilter], [keyPurgeFreqInSecond=0], [keyCapacity=1024], [parallelism=1])

Details

Creates a reactive state engine that returns a table in narrow format. For details, see createNarrowReactiveStateEngine.

Return value: A DStream object.

Arguments

metrics is metacode or a tuple of metacode containing columns from the input table (excluding keyColumn, optional) or factors (formulas for calculation, required).

metricNames is a STRING scalar or vector, indicating the name for each factor specified in metrics. The number and order of names must align to that of factors specified in metrics.

keyColumn (optional) is a STRING scalar/vector indicating the grouping column(s). The calculation is conducted within each group.

filter (optional) is the metacode that indicates the filtering conditions. A filtering condition must be an expression and only columns of dummyTable can be included. You can specify multiple conditions with logical operators (and, or). Only the results that satisfy the filter conditions are ingested to the output table.

keepOrder (optional) specifies whether to preserve the insertion order of the records in the output table. If keyColumn contains a time column, the default value is true, and otherwise false.

To clean up the data that is no longer needed after calculation, specify parameters keyPurgeFilter and keyPurgeFreqInSecond.

keyPurgeFilter (optional) indicates the filtering conditions that identify the data to be purged from the cache. It is metacode composed of conditional expressions, and these expressions must refer to the columns in the outputTable. keyPurgeFilter is effective only when keyColumn is specified.

keyPurgeFreqInSecond (optional) is a positive integer indicating the time interval (in seconds) to trigger a purge. keyPurgeFreqInSecond is effective only when keyColumn is specified.

For each data ingestion, the engine starts a purge if all of the following conditions are satisfied:

(1) The time elapsed since the last data ingestion is equal to or greater than keyPurgeFreqInSecond (For the first check, the time elapsed between the ingestion of data and the creation of the engine is used);

(2) If the first condition is satisfied, the engine applies keyPurgeFilter to the cached data to get the data to be purged.

(3) The number of groups which contain data to be purged is equal to or greater than 10% of the total number of groups in the engine.

To check the engine status before and after the purge, call getStreamEngineStat().ReactiveStreamEngine (see getStreamEngineStat) where the numGroups field indicates the number of groups in the reactive state streaming engine.

keyCapacity (optional) is a positive integer indicating the amount of memory allocated for buffering state of each group (defined by keyColumn) on a row basis. The default value is 1024. For data with large amount of groups, setting of this parameter can reduce the latency that may occur.

parallelism (optional) is a positive integer no greater than 63, indicating the maximum number of workers that can run in parallel. The default value is 1. For large computation workloads, reasonable adjustment of this parameter can effectively utilize computing resources and reduce computation time.

Note: parallelism cannot exceed the lesser of the numbers of licensed cores and logical cores minus one.

Examples

if (!existsCatalog("orca")) {
	createCatalog("orca")
}
go
use catalog orca

// If a stream graph with the same name already exists, destroy it first.
// dropStreamGraph('engine')
g = createStreamGraph('engine')

factor = [<createTime>, <updateTime>,<cumsum(qty)>,<cumavg(upToDatePrice)>]

g.source("trades", 1000:0,  ["securityID1","securityID2","securityID3","createTime","updateTime","upToDatePrice","qty","value"], [STRING,STRING,STRING,TIMESTAMP,TIMESTAMP,DOUBLE,DOUBLE,INT])
.narrowReactiveStateEngine(metrics=factor,metricNames=["factor1","factor2"], keyColumn=["securityID1","securityID2","securityID3"])
.sink("output")
g.submit()
go

dates=take(2012.01.01, 10) join take(2012.01.02, 4)
times=[09:00:00.030, 09:00:00.030, 09:00:00.031, 09:00:00.031, 09:00:00.031, 09:00:00.033, 09:00:00.033, 09:00:00.034, 09:00:00.034, 09:00:00.035, 09:00:00.031, 09:00:00.032, 09:00:00.032, 09:00:00.040]
syms=[`a, `a, `b, `a, `a, `b, `a, `b, `b, `a, `b, `a, `b, `c]
markets=['B', 'B', 'A', 'B', 'A', 'B', 'A', 'A', 'A', 'A', 'A', 'B', 'B', 'B']
prices=[10.65, 10.59, 10.59, 10.65, 10.59, 10.59, 10.59, 10.59, 10.22, 11.0, 10.22, 11.0, 15.6, 13.2]
qtys=[1500, 2500, 2500, 1500, 2500, 2500, 2500, 2500, 1200, 2500, 1200, 2500, 1300, 2000]
tmp=table(dates as date, times as time, syms as sym, markets as market, prices as price, qtys as qty)

num = 5
tmp = table(take("A" + lpad(string(1..4),4,"0"),num) as securityID1,take("CC.HH" + lpad(string(21..34),4,"0"),num) as securityID2,take("FFICE" + lpad(string(13..34),4,"0"),num) as securityID3, 2023.09.01 00:00:00+(1..num) as createTime, 2023.09.01 00:00:00+take(1..num, num).sort() as updateTime,take(rand(100.0,num) join take(int(),30),num) as upToDatePrice,take(take(100.0,num) join take(int(),30),num)+30 as qty,take(1..20 join take(int(),5),num) as value)

appendOrcaStreamTable("trades", tmp)

select * from orca_table.output
securityID1 securityID2 securityID3 createTime updateTime metricName metricValue
A0001 CC.HH0021 FFICE0013 2023.09.01 00:00:01.000 2023.09.01 00:00:01.000 factor1 130
A0001 CC.HH0021 FFICE0013 2023.09.01 00:00:01.000 2023.09.01 00:00:01.000 factor2 5.729826227745667
A0002 CC.HH0022 FFICE0014 2023.09.01 00:00:02.000 2023.09.01 00:00:02.000 factor1 130
A0002 CC.HH0022 FFICE0014 2023.09.01 00:00:02.000 2023.09.01 00:00:02.000 factor2 40.09022097935429
A0003 CC.HH0023 FFICE0015 2023.09.01 00:00:03.000 2023.09.01 00:00:03.000 factor1 130
A0003 CC.HH0023 FFICE0015 2023.09.01 00:00:03.000 2023.09.01 00:00:03.000 factor2 40.181519178922024
A0004 CC.HH0024 FFICE0016 2023.09.01 00:00:04.000 2023.09.01 00:00:04.000 factor1 130
A0004 CC.HH0024 FFICE0016 2023.09.01 00:00:04.000 2023.09.01 00:00:04.000 factor2 21.328769097950172
A0001 CC.HH0025 FFICE0017 2023.09.01 00:00:05.000 2023.09.01 00:00:05.000 factor1 130
A0001 CC.HH0025 FFICE0017 2023.09.01 00:00:05.000 2023.09.01 00:00:05.000 factor2 50.23656470375805