createStreamBroadcastEngine
语法
createStreamBroadcastEngine(name, dummyTable, outputTables)
详情
创建流数据广播引擎,将同一份流数据复制后分发至不同的目标表对象。该函数返回一个表对象,通过向该表对象注入数据,实现流数据的多路广播。
该引擎的应用场景是对同一份流数据同时进行不同的处理,例如将拷贝后的一份数据存入磁盘,另一份数据则输入引擎进行后续计算等。
参数
name 字符串标量,表示流数据广播引擎的名称,作为其在一个数据节点/计算节点上的唯一标识。可包含字母,数字和下划线,但必须以字母开头。
dummyTable 一个表对象,和订阅的流数据表的 schema 一致,可以含有数据,亦可为空表。
outputTables 由2个及以上表对象组成的元组,每个表对象的结构和 dummyTable 相同。表对象可以是内存表、分布式表或流计算引擎。
例子
share streamTable(1:0, `sym`price, [STRING,DOUBLE]) as tickStream
share streamTable(1000:0, `sym`factor1, [STRING,DOUBLE]) as resultStream
t=table(100:0, `sym`price, [STRING, DOUBLE])
//定义将要使用的输出表。这里定义1个状态引擎,1个分布式表用于存储数据
rse = createReactiveStateEngine(name="reactiveDemo", metrics =<cumavg(price)>, dummyTable=tickStream, outputTable=resultStream, keyColumn="sym")
if(existsDatabase("dfs://database1")){
dropDatabase("dfs://database1")
}
db=database("dfs://database1", VALUE, "A"+string(1..10))
pt=db.createPartitionedTable(t,`pt,`sym)
//定义广播引擎
broadcastEngine=createStreamBroadcastEngine(name="broadcastEngine", dummyTable=tickStream, outputTables=[loadTable("dfs://database1", `pt),getStreamEngine("reactiveDemo")])
//订阅流数据表tickStream
subscribeTable(tableName=`tickStream, actionName="sub", handler=tableInsert{broadcastEngine}, msgAsTable = true)
//订阅的数据注入引擎
n=100000
symbols=take(("A" + string(1..10)),n)
prices=100+rand(1.0,n)
t1=table(symbols as sym, prices as price)
tickStream.append!(t1)
// 查看分布式表中数据有100,000 条
select count(*) from loadTable("dfs://database1", `pt)
// output: 100,000
//查看状态引擎状态
getStreamEngineStat().ReactiveStreamEngine
name |
user |
status |
lastErrMsg |
numGroups |
numRows |
numMetrics |
memoryInUsed |
snapshotDir |
snapshotInterval |
snapshotMsgId |
snapshotTimestamp |
---|---|---|---|---|---|---|---|---|---|---|---|
reactiveDemo | admin | OK | 10 | 100,000 | 1 | 2,600 | -1 |