createCryptoOrderBookEngine

语法

createCryptoOrderBookEngine(name, dummyTable, inputColMap, [outputTable], depth, [updateRule='direct'], [errorHandler=NULL], [cachingInterval=5000], [timeout=-1], [outputHandler=NULL], [msgAsTable=false], [cachedDepth])

注:

社区版 License 暂不支持该引擎,如需使用此功能,请联系技术支持。

详情

根据数字货币全量深度快照和增量深度信息,实时更新订单簿。

返回值:一个表

参数

name 字符串标量,表示 CryptoOrderBook 引擎的名称,可包含字母,数字和下划线,但必须以字母开头。

dummyTable 一个表对象,表示输入表。dummyTable 需要通过 inputColMap 映射为如下列:

必需列1:

列名 类型 含义
symbol SYMBOL 数字货币代码。
isIncremental BOOL 是否为增量数据。true 表示增量数据,false 表示全量数据。
eventTime TIMESTAMP 事件时间。
askQty DECIMAL[] / DOUBLE[] 卖出数量的列表。
askPrice DECIMAL[] / DOUBLE[] 卖出价格的列表。
bidQty DECIMAL[] / DOUBLE[] 买入数量的列表。
bidPrice DECIMAL[] / DOUBLE[] 买入价格的列表。

必需列2:根据 updateRule 的取值进行调整。

updateRule ="direct" 时,不需要指定必需列2。

updateRule ="general" 时,指定以下列:

列名 类型 含义
prevUpdateId LONG 上一条数据的 ID。
updateId LONG 当前收到数据的 ID。

updateRule ="Binance-spot" 时,指定以下列:

列名 类型 含义
lastUpdateId LONG 从上次推送至今新增的最后一个 update Id。
firstUpdateId LONG 从上次推送至今新增的第一个 update Id。

updateRule ="Binance-futures" 时,指定以下列:

列名 类型 含义
lastUpdateId LONG 从上次推送至今新增的最后一个 update Id。
firstUpdateId LONG 从上次推送至今新增的第一个 update Id。
prevLastUpdateId LONG 上次推送的最后一个 update Id (即上条消息的 lastUpdateId)。

其它列:除上述必需列外的列。引擎会将这些列简单拷贝到 outputTable 的对应列中。

inputColMap 字典,将 dummyTable 中列字段的名称映射为引擎计算所需要的列。

outputTable 可选参数,一个表对象,用于输出最新的订单簿。其结构必须与 dummyTable 相同。outputTableoutputHandler 只需指定其一。

depth 正整数或字典,用于指定订单簿的深度。

  • 正整数:所有数字货币的订单簿深度都为该设置值。

  • 字典:键是字符串标量或向量,表示数字货币代码;值是整型,表示订单簿深度。对于指定的数字货币,将按照设定的深度输出订单簿。未被指定的数字货币将不输出订单簿结果。

updateRule 可选参数,字符串标量,表示订单簿更新规则。可选值为:

  • "direct":默认值,表示直接更新,系统将不进行数据丢失判断,仅根据 isIncremental 字段的值绝对是更新(isIncremental 为 true)还是覆盖(isIncremental 为 false)订单簿结果。

  • "general":表示通用的更新规则。要求按顺序插入增量数据,并确保数据中 updateId 单调递增。

  • "Binance-spot":表示适用于币安交易所现货的更新规则。

  • "Binance-futures":表示适用于币安交易所期货的更新规则。

errorHandler 可选参数,自定义函数。当增量数据出现缺失时,可通过该函数进行错误处理。包含 2 个参数:

  • 第一个是字符串标量,表示数字货币代码。

  • 第二个是整型标量,表示错误码,包括以下情况:

    • 1:接收到历史数据。

    • 2:接收到应在未来时间点到达的乱序数据。

    • 3:超时错误,指定时间间隔内没有新的订单簿合成。

    • 4:买卖价格交叉错误。判断交叉的条件为最高买价大于等于最低卖价。

cachingInterval 可选参数,整型标量,表示缓存增量深度数据的时间间隔。默认值为 5000,单位为毫秒。对于每个数字货币,当缓存中的数据时间与当前最新一条数据时间的差值小于等于 cachingInterval 时,该数据才会被保留在缓存中。

timeout 可选参数,整型标量,表示超时时间。单位为毫秒,默认值为 -1,表示不设置超时时间。在 timeout 指定的时间内未合成新的订单簿时,将触发超时错误,此时会通过 errorHandler 进行错误处理。

outputHandler 可选参数,一元函数。设置此参数时,引擎计算结束后,不再将计算结果写到输出表,而是会调用此函数处理计算结果。默认值为 NULL,表示仍将结果写到输出表。

msgAsTable 可选参数,布尔标量,表示在设置了参数 outputHandler 时,将引擎的计算结果以表的结构调用函数。默认值为 false,此时将计算结果的每一列作为元素组成元组。

cachedDepth 可选参数,正整数或字典,用于指定缓存中价格和数量(包括 askQty, askPrice, bidQty, bidPrice)的深度。

  • 正整数:缓存中所有数字货币的价格和数量将按照该值的深度进行缓存。

  • 字典:键是字符串标量或向量,表示数字货币代码;值是整型,表示对应的深度。对于指定的数字货币,将按照设定的深度缓存价格和数量。未被指定的数字货币将保留实际收到的所有价格和数量信息。

例子

运行代码前,先下载 ../data/CryptoOrderBookInput.zip 文件。

例1. 创建一个加密货币订单簿引擎,使用 "Binance-futures" 规则处理来自 Binance 交易所 U 本位合约的深度数据。根据 Binance 交易所数据发送规则,示例数据中首先为增量深度数据,在 68 行插入 1000 档全量快照数据后,引擎开始输出订单簿。

// 定义输入输出表结构
colNames = `isIncremental`exchange`eventTime`transactionTime`symbol`firstUpdateId`lastUpdateId`prevLastUpdateId`bidPrice`bidQty`askPrice`askQty
colTypes = [BOOL, SYMBOL, TIMESTAMP, TIMESTAMP, SYMBOL, LONG, LONG, LONG, DECIMAL128(18)[], DECIMAL128(8)[], DECIMAL128(18)[], DECIMAL128(8)[]]

// 创建输入输入输出表
share table(1:0, colNames, colTypes) as outputTable
share table(1:0, colNames, colTypes) as inputTable

inputTarget = ["symbol", "eventTime", "isIncremental", "bidPrice", "bidQty", "askPrice", "askQty", "lastUpdateId", "firstUpdateId", "prevLastUpdateId"]
inputSource = ["symbol", "eventTime", 'isIncremental', 'bidPrice', 'bidQty', 'askPrice', 'askQty', 'lastUpdateId', 'firstUpdateId', 'prevLastUpdateId']

inputColMap = dict(inputTarget, inputSource)


depth = dict(["BTCUSDT"], [1000])
cachedDepth = dict(["BTCUSDT"], [1500])

def errorHandler(instrument, code) {
    if (code == 1) {
        writeLog("handle hisotorical msg...")
    } else if (code == 2) {
        writeLog("handle unordered msg...")
    } else if (code == 3) {
        writeLog("handle timeout...")
    } else if (code == 4) {
        writeLog("handle corssed price...")
    } else {
        writeLog("unknown error!")
    }
}

// 创建引擎
engine = createCryptoOrderBookEngine(name="binanceFutures", dummyTable=inputTable, inputColMap=inputColMap, outputTable=outputTable, 
                                        depth=depth, updateRule="Binance-futures", errorHandler=errorHandler, cachingInterval=5000, 
                                        timeout=6000, msgAsTable=true, cachedDepth=cachedDepth)


// 加载示例数据
fin=file("binanceFuturesTestData.bin")
binanceFuturesTestData = fin.readObject()
fin.close();
// 数据插入引擎进行合成
getStreamEngine("binanceFutures").append!(binanceFuturesTestData)

// 清除环境
undef("inputTable", SHARED)
undef("outputTable", SHARED)
dropStreamEngine("binanceFutures")

例2. 创建一个加密货币订单簿引擎,使用 "general" 规则处理来自 OKX 交易所永续合约的深度数据。根据 OKX 交易所数据发送规则,示例数据中首行为 400 档快照数据,之后为增量快照数据。


// 定义输入输出表结构
colNames=['isIncremental', 'symbol', 'askPrice', 'askVolume', 'askNum', 'bidPrice', 'bidVolume', 'bidNum', 'checksum', 'prevSeqId', 'seqId', 'updateTime'];
colTypes=[BOOL, SYMBOL, DECIMAL128(18)[], DECIMAL128(8)[], INT[], DECIMAL128(18)[], DECIMAL128(8)[], INT[], LONG, LONG, LONG, TIMESTAMP];

// 创建输入输入输出表
share table(1:0, colNames, colTypes) as inputTable
share table(1:0, colNames, colTypes) as outputTable

inputTarget = ["symbol", "eventTime", "isIncremental", "bidPrice", "bidQty", "askPrice", "askQty", "prevUpdateId", "updateId"];
inputSource = ["symbol", "updateTime", 'isIncremental', 'bidPrice', 'bidVolume', 'askPrice', 'askVolume', 'prevSeqId', 'seqId'];

inputColMap = dict(inputTarget, inputSource)

depth = dict(["BTC-USD-SWAP"], [400])
cachedDepth = dict(["BTC-USD-SWAP"], [800])

def errorHandler(instrument, code) {
    if (code == 1) {
        writeLog("handle hisotorical msg...")
    } else if (code == 2) {
        writeLog("handle unordered msg...")
    } else if (code == 3) {
        writeLog("handle timeout...")
    } else if (code == 4) {
        writeLog("handle corssed price...")
    } else {
        writeLog("unknown error!")
    }
}

// 创建引擎
engine = createCryptoOrderBookEngine(name="okxSwap", dummyTable=inputTable, inputColMap=inputColMap, outputTable=outputTable, 
                                        depth=depth, updateRule="general", errorHandler=errorHandler, cachingInterval=5000, 
                                        timeout=6000, msgAsTable=true, cachedDepth=cachedDepth)


// 加载示例数据
fin=file("okxTestData.bin")
okxTestData = fin.readObject()
fin.close();
// 数据插入引擎进行合成
getStreamEngine("okxSwap").append!(okxTestData)

// 清除环境
undef("inputTable", SHARED)
undef("outputTable", SHARED)
dropStreamEngine("okxSwap")