DStream::cryptoOrderBookEngine
Syntax
DStream::cryptoOrderBookEngine(inputColMap, depth, [updateRule='direct'],
[errorHandler=NULL], [cachingInterval=5000], [timeout=-1],
[cachedDepth])
Details
Creates a real-time order book engine for cryptocurrency stream processing, which updates the order book in real time based on full-depth snapshots and incremental depth information. For details, see createCryptoOrderBookEngine.
Return value: A DStream object.
Arguments
inputColMap is a dictionary mapping column names in dummyTable to the required columns.
- Integer: Applies the same depth for all cryptocurrencies.
- Dictionary: Keys are cryptocurrency codes, and values are the depth for each. If a cryptocurrency is not specified, it will not output an order book result.
- "direct" (default): Updates directly based on isIncremental field (true for update, false for overwrite).
- "general": General update rule, requiring streaming data with monotonically increasing update IDs.
- "Binance-spot": Update rule for Binance spot data.
- "Binance-futures": Update rule for Binance futures data.
- The first argument is a string representing the cryptocurrency code.
- The second argument is an integer representing the error code, with possible
values:
- 1: Received old data.
- 2: Received out-of-order data expected at a future time.
- 3: Timeout, indicating no new order book update within the specified time.
- 4: Crossed prices error, where the highest bid is greater than or equal to the lowest ask.
cachingInterval(optional) is an integer indicating the interval (in milliseconds) within which incremental data is cached. The default is 5000. For each crypto, data is retained in the cache if the time difference between the first data in cache and the latest is no greater than cachingInterval.
timeout (optional) is an integer specifying the timeout period in milliseconds. The default is -1 (no timeout). If order book is not updated within this period, the errorHandler will be invoked.
- Integer: Applies the same depth for cached order books for all cryptocurrencies.
- Dictionary: Keys are cryptocurrency codes, and values are the depth for each. If a cryptocurrency is not specified, full-depth order books are cached.
Examples
if (!existsCatalog("orca")) {
createCatalog("orca")
}
go
use catalog orca
// Define input table schema
colNames = `isIncremental`exchange`eventTime`transactionTime`symbol`firstUpdateId`lastUpdateId`prevLastUpdateId`bidPrice`bidQty`askPrice`askQty
colTypes = [BOOL, SYMBOL, TIMESTAMP, TIMESTAMP, SYMBOL, LONG, LONG, LONG, DECIMAL128(18)[], DECIMAL128(8)[], DECIMAL128(18)[], DECIMAL128(8)[]]
inputTarget = ["symbol", "eventTime", "isIncremental", "bidPrice", "bidQty", "askPrice", "askQty", "lastUpdateId", "firstUpdateId", "prevLastUpdateId"]
inputSource = ["symbol", "eventTime", 'isIncremental', 'bidPrice', 'bidQty', 'askPrice', 'askQty', 'lastUpdateId', 'firstUpdateId', 'prevLastUpdateId']
// Map input columns
inputColMap = dict(inputTarget, inputSource)
// Set depth
depth = dict(["BTCUSDT"], [1000])
cptGraph = createStreamGraph("cptEngine")
cptGraph.source("cptInput", 1000:0, colNames,colTypes)
.cryptoOrderBookEngine(inputColMap, depth)
.sink("output")
cptGraph.submit()