StreamGraph::submit

Syntax

StreamGraph::submit([checkpointConfig])

Arguments

checkpointConfig (optional) is a dictionary that specifies configuration options related to stream graph checkpoints. Available options are:

Details

Submits a stream graph.

In a cluster deployment, this function must be run on a compute node, and the user must be an administrator or have the COMPUTE_GROUP_EXEC permission to submit the task successfully.

In a single-node deployment, permission checks are not required, and the stream graph can be submitted directly.

key Description Value Range Default
enable Whether to enable Checkpoint true/false false
interval Time interval to trigger Checkpoint, in milliseconds [10 seconds, 1 year] 1 hour
timeout Timeout for Checkpoint. If Checkpoint is not completed within the specified time, it will be considered failed, in milliseconds [1 second, 1 hour] 10 minutes
alignedTimeout Timeout for Barrier alignment. If alignment is not completed within the specified time, the Checkpoint will be considered failed, in milliseconds [100 milliseconds, 1 hour] 10 minutes
minIntervalBetweenCkpt Minimum time interval between the completion of the last Checkpoint and the initiation of the next Checkpoint [0, 1 year] 0
consecutiveFailures Maximum number of consecutive Checkpoint failures. If exceeded, the status of the entire streaming graph will be switched to ERROR. [0, 102400] 3
maxConcurrentCheckpoints Maximum number of concurrent Checkpoints allowed. Please note that allowing concurrent Checkpoints may impact running streaming jobs. [1, 102400] 1
maxRetainedCheckpoints The system will periodically clean up historical Checkpoint data. This parameter sets the maximum number of latest Checkpoints to retain. [1, 1024] 3

Examples

Submit a stream graph g with custom checkpoint settings.

For more information on stream graph submission and usage, see the Orca page.

if (!existsCatalog("demo")) {
	createCatalog("demo")
}
go
use catalog demo

// Define checkpoint config
ckptConfig = {
    "enable":true,
    "interval": 10000,
    "timeout": 36000,
    "maxConcurrentCheckpoints": 1
};

// Define aggregators
aggregators = [
    <first(price) as open>,
    <max(price) as high>,
    <min(price) as low>,
    <last(price) as close>,
    <sum(volume) as volume>
]
indicators = [
    <time>,
    <high>,
    <low>,
    <close>,
    <volume>
]

// Create and configure stream graph
g = createStreamGraph("indicators") 
g.source("trade", 1:0, `time`symbol`price`volume, [DATETIME,SYMBOL,DOUBLE,LONG])
    .timeSeriesEngine(windowSize=60, step=60, metrics=aggregators, timeColumn=`time, keyColumn=`symbol)
    .buffer("one_min_bar")
    .reactiveStateEngine(metrics=indicators, keyColumn=`symbol)
    .buffer("one_min_indicators")

// Submit stream graph with checkpoint config
g.submit(ckptConfig)

Related functions: getOrcaCheckpointConfig, setOrcaCheckpointConfig.