StreamGraph::submit
Syntax
StreamGraph::submit([checkpointConfig])
Arguments
checkpointConfig (optional) is a dictionary that specifies configuration options related to stream graph checkpoints. Available options are:
Details
Submits a stream graph.
In a cluster deployment, this function must be run on a compute node, and the user must be an administrator or have the COMPUTE_GROUP_EXEC permission to submit the task successfully.
In a single-node deployment, permission checks are not required, and the stream graph can be submitted directly.
key | Description | Value Range | Default |
---|---|---|---|
enable | Whether to enable Checkpoint | true/false | false |
interval | Time interval to trigger Checkpoint, in milliseconds | [10 seconds, 1 year] | 1 hour |
timeout | Timeout for Checkpoint. If Checkpoint is not completed within the specified time, it will be considered failed, in milliseconds | [1 second, 1 hour] | 10 minutes |
alignedTimeout | Timeout for Barrier alignment. If alignment is not completed within the specified time, the Checkpoint will be considered failed, in milliseconds | [100 milliseconds, 1 hour] | 10 minutes |
minIntervalBetweenCkpt | Minimum time interval between the completion of the last Checkpoint and the initiation of the next Checkpoint | [0, 1 year] | 0 |
consecutiveFailures | Maximum number of consecutive Checkpoint failures. If exceeded, the status of the entire streaming graph will be switched to ERROR. | [0, 102400] | 3 |
maxConcurrentCheckpoints | Maximum number of concurrent Checkpoints allowed. Please note that allowing concurrent Checkpoints may impact running streaming jobs. | [1, 102400] | 1 |
maxRetainedCheckpoints | The system will periodically clean up historical Checkpoint data. This parameter sets the maximum number of latest Checkpoints to retain. | [1, 1024] | 3 |
Examples
Submit a stream graph g with custom checkpoint settings.
For more information on stream graph submission and usage, see the Orca page.
if (!existsCatalog("demo")) {
createCatalog("demo")
}
go
use catalog demo
// Define checkpoint config
ckptConfig = {
"enable":true,
"interval": 10000,
"timeout": 36000,
"maxConcurrentCheckpoints": 1
};
// Define aggregators
aggregators = [
<first(price) as open>,
<max(price) as high>,
<min(price) as low>,
<last(price) as close>,
<sum(volume) as volume>
]
indicators = [
<time>,
<high>,
<low>,
<close>,
<volume>
]
// Create and configure stream graph
g = createStreamGraph("indicators")
g.source("trade", 1:0, `time`symbol`price`volume, [DATETIME,SYMBOL,DOUBLE,LONG])
.timeSeriesEngine(windowSize=60, step=60, metrics=aggregators, timeColumn=`time, keyColumn=`symbol)
.buffer("one_min_bar")
.reactiveStateEngine(metrics=indicators, keyColumn=`symbol)
.buffer("one_min_indicators")
// Submit stream graph with checkpoint config
g.submit(ckptConfig)
Related functions: getOrcaCheckpointConfig, setOrcaCheckpointConfig.