DStream::cryptoOrderBookEngine

Syntax

DStream::cryptoOrderBookEngine(inputColMap, depth, [updateRule='direct'], [errorHandler=NULL], [cachingInterval=5000], [timeout=-1], [cachedDepth])

Details

Creates a real-time order book engine for cryptocurrency stream processing, which updates the order book in real time based on full-depth snapshots and incremental depth information. For details, see createCryptoOrderBookEngine.

Return value: A DStream object.

Arguments

inputColMap is a dictionary mapping column names in dummyTable to the required columns.

depth is an integer or dictionary specifying the depth of the order book.
  • Integer: Applies the same depth for all cryptocurrencies.
  • Dictionary: Keys are cryptocurrency codes, and values are the depth for each. If a cryptocurrency is not specified, it will not output an order book result.
updateRule (optional) is a string specifying the order book update rule.
  • "direct" (default): Updates directly based on isIncremental field (true for update, false for overwrite).
  • "general": General update rule, requiring streaming data with monotonically increasing update IDs.
  • "Binance-spot": Update rule for Binance spot data.
  • "Binance-futures": Update rule for Binance futures data.
errorHandler (optional) is a UDF to handle errors when incremental data is missing. It takes two arguments:
  • The first argument is a string representing the cryptocurrency code.
  • The second argument is an integer representing the error code, with possible values:
    • 1: Received old data.
    • 2: Received out-of-order data expected at a future time.
    • 3: Timeout, indicating no new order book update within the specified time.
    • 4: Crossed prices error, where the highest bid is greater than or equal to the lowest ask.

cachingInterval(optional) is an integer indicating the interval (in milliseconds) within which incremental data is cached. The default is 5000. For each crypto, data is retained in the cache if the time difference between the first data in cache and the latest is no greater than cachingInterval.

timeout (optional) is an integer specifying the timeout period in milliseconds. The default is -1 (no timeout). If order book is not updated within this period, the errorHandler will be invoked.

cachedDepth (optional) is an integer or dictionary specifying the depth for cached order books.
  • Integer: Applies the same depth for cached order books for all cryptocurrencies.
  • Dictionary: Keys are cryptocurrency codes, and values are the depth for each. If a cryptocurrency is not specified, full-depth order books are cached.

Examples

if (!existsCatalog("orca")) {
	createCatalog("orca")
}
go
use catalog orca

// Define input table schema
colNames = `isIncremental`exchange`eventTime`transactionTime`symbol`firstUpdateId`lastUpdateId`prevLastUpdateId`bidPrice`bidQty`askPrice`askQty
colTypes = [BOOL, SYMBOL, TIMESTAMP, TIMESTAMP, SYMBOL, LONG, LONG, LONG, DECIMAL128(18)[], DECIMAL128(8)[], DECIMAL128(18)[], DECIMAL128(8)[]]

inputTarget = ["symbol", "eventTime", "isIncremental", "bidPrice", "bidQty", "askPrice", "askQty", "lastUpdateId", "firstUpdateId", "prevLastUpdateId"]
inputSource = ["symbol", "eventTime", 'isIncremental', 'bidPrice', 'bidQty', 'askPrice', 'askQty', 'lastUpdateId', 'firstUpdateId', 'prevLastUpdateId']

// Map input columns
inputColMap = dict(inputTarget, inputSource)

// Set depth
depth = dict(["BTCUSDT"], [1000])

cptGraph = createStreamGraph("cptEngine")
cptGraph.source("cptInput", 1000:0, colNames,colTypes)
.cryptoOrderBookEngine(inputColMap, depth)
.sink("output")
cptGraph.submit()