StreamGraph::submit
Syntax
StreamGraph::submit([checkpointConfig])
Arguments
checkpointConfig (optional) is a dictionary that specifies configuration options related to stream graph checkpoints. Available options are:
Details
Submits a stream graph.
In a cluster deployment, this function must be run on a compute node, and the user must be an administrator or have the COMPUTE_GROUP_EXEC permission to submit the task successfully.
In a single-node deployment, permission checks are not required, and the stream graph can be submitted directly.
| key | Description | Value Range | Default | 
|---|---|---|---|
| enable | Whether to enable Checkpoint | true/false | false | 
| interval | Time interval to trigger Checkpoint, in milliseconds | [10 seconds, 1 year] | 1 hour | 
| timeout | Timeout for Checkpoint. If Checkpoint is not completed within the specified time, it will be considered failed, in milliseconds | [1 second, 1 hour] | 10 minutes | 
| alignedTimeout | Timeout for Barrier alignment. If alignment is not completed within the specified time, the Checkpoint will be considered failed, in milliseconds | [100 milliseconds, 1 hour] | 10 minutes | 
| minIntervalBetweenCkpt | Minimum time interval between the completion of the last Checkpoint and the initiation of the next Checkpoint | [0, 1 year] | 0 | 
| consecutiveFailures | Maximum number of consecutive Checkpoint failures. If exceeded, the status of the entire streaming graph will be switched to ERROR. | [0, 102400] | 3 | 
| maxConcurrentCheckpoints | Maximum number of concurrent Checkpoints allowed. Please note that allowing concurrent Checkpoints may impact running streaming jobs. | [1, 102400] | 1 | 
| maxRetainedCheckpoints | The system will periodically clean up historical Checkpoint data. This parameter sets the maximum number of latest Checkpoints to retain. | [1, 1024] | 3 | 
Examples
Submit a stream graph g with custom checkpoint settings.
For more information on stream graph submission and usage, see the Orca page.
if (!existsCatalog("demo")) {
	createCatalog("demo")
}
go
use catalog demo
// Define checkpoint config
ckptConfig = {
    "enable":true,
    "interval": 10000,
    "timeout": 36000,
    "maxConcurrentCheckpoints": 1
};
// Define aggregators
aggregators = [
    <first(price) as open>,
    <max(price) as high>,
    <min(price) as low>,
    <last(price) as close>,
    <sum(volume) as volume>
]
indicators = [
    <time>,
    <high>,
    <low>,
    <close>,
    <volume>
]
// Create and configure stream graph
g = createStreamGraph("indicators") 
g.source("trade", `time`symbol`price`volume, [DATETIME,SYMBOL,DOUBLE,LONG])
    .timeSeriesEngine(windowSize=60, step=60, metrics=aggregators, timeColumn=`time, keyColumn=`symbol)
    .buffer("one_min_bar")
    .reactiveStateEngine(metrics=indicators, keyColumn=`symbol)
    .buffer("one_min_indicators")
// Submit stream graph with checkpoint config
g.submit(ckptConfig)Related functions: getOrcaCheckpointConfig, setOrcaCheckpointConfig.
