createStreamDispatchEngine
Syntax
createStreamDispatchEngine(name, dummyTable, keyColumn, outputTable, [dispatchType=’hash’], [hashByBatch=false], [outputLock=true], [queueDepth=4096], [outputElapsedTime=false], [mode=’buffer’])
Details
createStreamDispatchEngine
function creates a stream dispatch engine that distributes incoming data streams to specified output tables for computational load balancing. The output tables can be in-memory tables, DFS tables, or other streaming engines. The function returns a table object.
Key characteristics of the stream dispatch engine:
Supports multithreaded input and output of streams.
Used only for data dispatching, not for metrics computing.
Typical usage:
The stream dispatch engine can distribute market data to one or more computational streaming engines that calculate factors. This achieves optimal performance by balancing the computational load.
Arguments
name is a string indicating the name of the engine. It is the only identifier of an engine. It can contain letters, numbers and underscores but must start with a letter.
dummyTable is a table whose schema must be the same as the stream table to which the engine subscribes. Whether dummyTable contains data does not matter.
keyColumn (optional) is a string. If provided, the ingested data will be distributed to output tables based on the values in this column. Unique values in keyColumn are treated as keys.
outputTable is one or more tables that the engine outputs data to. When outputElapsedTime = false, outputTable must have the same schema as dummyTable; when outputElapsedTime = true, outputTable should have two additional columns - a LONG column and an INT column (see outputElapsedTime).
Up to 100 tables can be specified for outputTable. The engine starts a thread for each output table to process the distributed data. To specify multiple output tables, pass a tuple, embedding sub-tuples if needed. For examples:
To distribute evenly to 4 tables, specify
outputTable=[table1, table2, table3, table4]
To distribute evenly to 2 replicated table sets, specify
[[table1_1, table1_2], [table2_1, table2_2]]
. This maintains 2 replicas of the ingested data - replica 1 distributed across table1_1 and table1_2, and replica 2 distributed across table2_1 and table2_2.
dispatchType (optional) is a string. It can be:
“hash” (default) - Apply a hash algorithm on keyColumn and distribute records based on the hash result. Hash distribution can be uneven across tables.
“uniform” - Evenly distribute records across output tables based on keyColumn values.
The default “hash” is recommended unless data distribution is highly uneven and impacts performance. In that case, try “uniform”.
hashByBatch (optional) is a Boolean value. The default is false, indicating that for each batch of data ingested into the engine, group records by keyColumn values first, then distribute groups across tables based on dispatchType.
To set hashByBatch to true, dispatchType must be ‘hash’. In this case, for each ingested batch of data, the engine randomly selects a key, computes its hash value, and distributes the entire batch based on the hash result.
Note: Setting hashbyBatch = false ensures that records with identical keys are output to the same table. However, grouping records by key adds processing cost.
outputLock (optional) is a Boolean value indicating whether to apply a lock output table(s) to prevent concurrent write conflicts. The default is true (recommended). False means not to apply lock to the output table(s).
An output table, essentially an in-memory table, does not allow concurrent writes. As threads working for other streaming engines or subscriptions may also write to the output tables, the lock ensures thread safety. However, locking comes at a performance cost. If it can be guaranteed no other threads will write to the output tables concurrently, outputLock can be set to false to optimize performance.
queueDepth (optional) is a positive integer controlling the queue or buffer size for each output thread. The default is 4096 (records).
When mode = “buffer”, queueDepth sets the size of the cache table for each thread of an output table;
When mode = “queue”, queueDepth sets the maximum depth of each output queue.
Set queueDepth based on the expected data volume: if the ingested amount is small, a large queueDepth wastes memory; if the ingested amount is large, a small queueDepth may cause output blocking.
outputElapsedTime is an optional Boolean value indicating whether to print the elapsed time to process each ingested batch, from ingestion to output. The default is false. If outputElapsedTime = true, two extra columns are added to each output table: a LONG column for the time elapsed in microseconds to process each data batch internally, and an INT column for nanosecond timestamps of when each batch was output.
mode (optional) is a string. It can be:
“buffer” (default) - For each thread working for an output table, the engine creates an in-memory cache table to buffer pending writes. It copies data into the cache before writing to output table. Use this mode when (1) the input source(s) may have concurrent reads/writes while ingesting data into the engine; or (2) the input source(s) frequently appends small batches of data to the engine.
“queue” - For each thread working for an output table, the engine maintains a queue per with references to input data. Input data is not copied, only referenced. This requires no concurrent reads/writes to the input source(s) during ingestion. This mode is best when the input source(s) infrequently append large batches of data.
Examples
Distribute data of a stream table to 3 reactive state streaming engines for metric computation using the stream dispatch engine. The final results are output into one single table.
//define the input stream table for the stream dispatch engine
$ share streamTable(1:0, `sym`price, [STRING,DOUBLE]) as tickStream
//define the output table for the reactive state engines
$ share streamTable(1000:0, `sym`factor1, [STRING,DOUBLE]) as resultStream
//define 3 reactive state engines as the output tables for the stream dispatch engine
$ for(i in 0..2){
$ rse = createReactiveStateEngine(name="reactiveDemo"+string(i), metrics =<cumavg(price)>, dummyTable=tickStream, outputTable=resultStream, keyColumn="sym")
}
//create the stream dispatch engine
$ dispatchEngine=createStreamDispatchEngine(name="dispatchDemo", dummyTable=tickStream, keyColumn=`sym, outputTable=[getStreamEngine("reactiveDemo0"),getStreamEngine("reactiveDemo1"),getStreamEngine("reactiveDemo2")])
// the stream dispatch engine subscribes to the stream table
$ subscribeTable(tableName=`tickStream, actionName="sub", handler=tableInsert{dispatchEngine}, msgAsTable = true)
// ingest data to the stream dispatch engine
$ n=100000
$ symbols=take(("A" + string(1..10)),n)
$ prices=100+rand(1.0,n)
$ t=table(symbols as sym, prices as price)
$ tickStream.append!(t)
$ select count(*) from resultStream
100,000
// check the status of the reactive state engines
$ getStreamEngineStat().ReactiveStreamEngine
name | user | status | lastErrMsg | numGroups | numRows | numMetrics | memoryInUsed | snapshotDir | snapshotInterval | snapshotMsgId | snapshotTimestamp | ||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
reactiveDemo2 | admin | OK | 1 | 10,000 | 1 | 921 | -1 | ||||||
reactiveDemo1 | admin | OK | 5 | 50,000 | 1 | 1,437 | -1 | ||||||
reactiveDemo0 | admin | OK | 4 | 40,000 | 1 | 1,308 | -1 |