StreamGraph::setLocalConfigOnce
Syntax
StreamGraph::setLocalConfigOnce(dict)
Arguments
dict is a dictionary, supporting the following key-value pairs:
| Key | Type | Default Value | Description | 
|---|---|---|---|
| subscription.batchSize | INT | 0 | Sets the batchSize parameter for subscriptions between adjacent nodes in the stream graph. | 
| subscription.throttle | INT | 1 | Sets the throttle parameter for subscriptions between adjacent nodes in the stream graph. | 
| subscription.timeTrigger | BOOL | false | Sets the timeTrigger parameter for subscriptions between adjacent nodes in the stream graph. | 
| subscription.sourceOffset | INT | -3 | Sets the offset parameter of subscriptions between nodes
                                created by StreamGraph::sourceand their directly
                                connected downstream nodes in the stream graph. | 
Details
Set subscription configurations between adjacent nodes in the stream graph. Once this
                function is called, the specified configuration will override any corresponding
                global configuration set via StreamGraph::setConfigMap, or add new
                configurations if they were not previously defined.
The configuration takes effect only once between the calling node and its directly connected downstream node; it does not apply if the two nodes are part of a cascade.
Note: Since operations such as sink and map
                do not generate new stream graph nodes, the configuration will propagate and take
                effect on the next actual node in the graph.
Examples
The following example sets the batchSize parameter for the subscription for the 1-minute K-line computation node to 100, while all other subscriptions in the stream graph retain the default batchSize value.
,if (!existsCatalog("orca")) {
	createCatalog("orca")
}
use catalog orca
g = createStreamGraph("indicators")
sourceStreams = g.source("trade", 1024:0, `symbol`datetime`price`volume, [SYMBOL, TIMESTAMP,DOUBLE, INT])
    .fork(2)
stream_1min = sourceStreams[0]
    .setLocalConfigOnce({
      "subscription.batchSize": 100
    })
    .timeSeriesEngine(60*1000, 60*1000, <[first(price),max(price),min(price),last(price),sum(volume)]>, "datetime", false, "symbol")
    .reactiveStateEngine(<[datetime, first_price, max_price, min_price, last_price, sum_volume, mmax(max_price, 5), mavg(sum_volume, 5)]>, `symbol)
    .sink("output_1min")
stream_5min = sourceStreams[1]
    .timeSeriesEngine(5*60*1000, 5*60*1000, <[first(price),max(price),min(price),last(price),sum(volume)]>, "datetime", false, "symbol")
    .reactiveStateEngine(<[datetime, first_price, max_price, min_price, last_price, sum_volume, mmax(max_price, 5), mavg(sum_volume, 5)]>, `symbol)
    .sink("output_5min")