createStreamBroadcastEngine
Syntax
createStreamBroadcastEngine(name, dummyTable,
outputTables)
Details
createStreamBroadcastEngine
creates a stream broadcast engine that
distributes the same data stream to different target tables. This function returns a
table object, and by ingesting data to the table, multi-channel broadcasting of the
streaming data is achieved.
Use this engine when you need to process a single stream of data in multiple ways. For example, save one copy to disk while sending another copy to a computing engine for further processing.
Arguments
name is a string indicating the name of the engine. It is the only identifier of an engine on a data or compute node. It can have letter, number and "_" and must start with a letter.
dummyTable is a table object whose schema must be the same as the subscribed stream table. Whether dummyTable contains data does not matter.
outputTables is a tuple of two or more table objects (which can be in-memory tables, DFS tables, or streaming engines). The schema of each table object must be the same as dummyTable.
Examples
share streamTable(1:0, `sym`price, [STRING,DOUBLE]) as tickStream
share streamTable(1000:0, `sym`factor1, [STRING,DOUBLE]) as resultStream
t=table(100:0, `sym`price, [STRING,DOUBLE])
//define the output tables: a reactive state engine and a DFS table for storing data
rse = createReactiveStateEngine(name="reactiveDemo", metrics =<cumavg(price)>, dummyTable=tickStream, outputTable=resultStream, keyColumn="sym")
if(existsDatabase("dfs://database1")){
dropDatabase("dfs://database1")
}
db=database("dfs://database1", VALUE, "A"+string(1..10))
pt=db.createPartitionedTable(t,`pt,`sym)
//create the stream broadcast engine
broadcastEngine=createStreamBroadcastEngine(name="broadcastEngine", dummyTable=tickStream, outputTables=[loadTable("dfs://database1", `pt),getStreamEngine("reactiveDemo")])
//subscribe to the tickStream stream table
subscribeTable(tableName=`tickStream, actionName="sub", handler=tableInsert{broadcastEngine}, msgAsTable = true)
//ingest the subscribed data into the engine
n=100000
symbols=take(("A" + string(1..10)),n)
prices=100+rand(1.0,n)
t1=table(symbols as sym, prices as price)
tickStream.append!(t1)
//check the number of records in the DFS table
select count(*) from loadTable("dfs://database1", `pt)
// output: 100,000
//check the status of the reactive state streaming engine
getStreamEngineStat().ReactiveStreamEngine
name |
user |
status |
lastErrMsg |
numGroups |
numRows |
numMetrics |
memoryInUsed |
snapshotDir |
snapshotInterval |
snapshotMsgId |
snapshotTimestamp |
---|---|---|---|---|---|---|---|---|---|---|---|
reactiveDemo | admin | OK | 10 | 100,000 | 1 | 2,600 | -1 |