WebSocket

DolphinDB 的 WebSocket 插件提供了强大的 WebSocket 客户端功能。作为一个 WebSocket 客户端,该插件能够连接到 WebSocket 服务端,并实现双向通信。它支持从 WebSocket 服务端读取数据,例如实时的行情数据、交易数据等。同时,插件也支持向 WebSocket 服务端发送数据,比如下单指令、订阅请求等。除此之外,该插件还能够提取 WebSocket 通信过程中的响应头文件,帮助用户更好地了解通信状态和获取相关信息。该插件目前仅支持 wss 协议。

安装插件

版本要求

DolphinDB Server:2.00.12 及更高版本,支持 Linux x86-64。

安装步骤

  1. 在DolphinDB 客户端中使用 listRemotePlugins 命令查看插件仓库中的插件信息。

    注意:仅展示当前操作系统和 server 版本支持的插件。若无预期插件,可自行编译(请自行选择对应分支下的插件)或在 DolphinDB 用户社区进行反馈。

    login("admin", "123456")
    listRemotePlugins()
  2. 使用 installPlugin 命令完成插件安装。

    installPlugin("WebSocket")
  3. 使用 loadPlugin 命令加载插件。

    loadPlugin("WebSocket")

接口说明

createSubJob

语法

WebSocket::createSubJob(url, onOpen, onMessage, onError, onClose, tag, [config])

详情

创建一个 WebSocket 订阅,返回一个 WebSocket 对象。如果订阅 tag 已存在且正在运行,则抛出错误;否则覆盖之前的订阅信息。

参数

url STRING 类型标量,表示 WebSocket 的地址。

onOpen 一个函数,其第一个参数为 WebSocket 对象。通过该参数指定建立连接时将调用的 onOpen 函数。

函数签名如下:

def onOpen(ws){}

onMessage 一个函数,其第一个参数为 WebSocket 对象;第二个参数是一个表,表示订阅的数据,其仅包含一个列,类型为 BLOB,列名为 msg。通过该参数指定 WebSocket 数据接入时将调用 onMessage 函数。

函数签名如下:

def onMessage(ws, data){}

onError 一个函数,其第一个参数为 WebSocket 对象;第二个参数是错误信息,为 BLOB 类型标量。通过该参数指定发生错误时将调用的 onError 方法。

函数签名如下:

def onError(ws, error){}

onClose 一个函数,其第一个参数为 WebSocket 对象,第二个参数是错误码,为 INT 类型标量;第三个参数是关闭信息,为 BLOB 类型标量。通过该参数指定关闭订阅时将调用的 onClose 方法。

函数签名如下:

def onClose(ws, statusCode, msg){}

tag STRING 类型标量,表示 WebSocket 的订阅标识。

config 字典,用于指定一些额外的配置项。其键为 STRING 类型标量、值为 ANY。目前支持的键为:

  • "proxy":设置代理地址,键值为 STRING 类型标量,例如 “http://proxy.example.com:8080/"。

  • "reconnectCount":用于设置非用户主动取消订阅的情况下,连接断开后重新连接的尝试次数,键值为 INT 类型标量。默认是 -1,表示会一直尝试重连直到成功连接。

share table(1:0, ["data"], [BLOB]) as table1
def onOpen(ws){
  WebSocket::send(ws, '{"method": "SUBSCRIBE","params": ["btcusdt@aggTrade","btcusdt@depth"], "id": 1}')
}
def onMessage(ws, data){
  	table1.append!(table(data.msg as col1))
}

def onError(ws, error){
  writeLog("failed to receive data: " + error)
}
def onClose(ws, statusCode, msg){
  writeLog("connection is close, statusCode: " + statusCode + ", " + msg)
}
config = dict(STRING, ANY)
config[`proxy] = "http://proxy.example.com:8080/"
ws = WebSocket::createSubJob(url, onOpen, onMessage, onError, onClose, "sub1", config)

send(ws, data)

语法

WebSocket::send(ws, data)

详情

发送 WebSocket 消息,可以在 WebSocket::createSubJob 的参数 onOpenonMessage 中调用,也可以使用WebSocket::createSubJob 返回的 WebSocket 对象发送消息。返回一个布尔值,表示是否发送成功。

参数

ws WebSocket 对象。

data BLOB 类型标量,表示需要发送的数据。

getSubJobStat()

语法

WebSocket::getSubJobStat()

详情

获取所有的 WebSocket 后台任务信息,包含已经通过 WebSocket::cancelSubJob 取消的任务。注意,如果不同任务的 tag 值相同,前一个任务的信息将会被后一个任务的信息覆盖掉。返回一个表,包含如下列:

  • tag:WebSocket 任务标识。
  • startTime:任务创建时间。
  • endTime:任务取消时间。任务可以通过 WebSocket::cancelSubJob 来取消,初始值是空值,表示任务未被取消,或者是重连成功。
  • firstMsgTime:第一条数据的接收时间。
  • lastMsgTime:上一条消息的接收时间。
  • processedMsgCount:成功接收的消息行数。初始值是0。
  • failedNetMsgCount:网络错误的消息行数。初始值是0。
  • failedMsgCount:处理失败的消息行数。初始值是0。
  • netErrCount:上一次处理失败的错误信息。
  • lastOnFuncErrMsg:上一次回调函数出现的错误。
  • lastFailedTimestamp:上一次处理失败的时间。

cancelSubJob

语法

WebSocket::cancelSubJob(tag)

详情

取消一个 WebSocket 的后台任务,返回一个布尔值。

参数

tag STRING 类型标量,表示 WebSocket 的任务标识。可以通过 WebSocket::getSubJobStat() 获取 tag

使用示例

tag = (exec tag from WebSocket::getSubJobStat())[0]
WebSocket::cancelSubJob(tag)

getResponseHeader

语法

WebSocket::getResponseHeader(ws, key)

详情

获取 WebSocket 的消息头。返回一个字符串标量。该消息头可以在 WebSocket::createSubJob 的参数 onOpenonMessage 中调用。

参数

ws WebSocket 对象。

key STRING 类型标量,表示需要获取的 Header 的对应的 key。

data = WebSocket::getResponseHeader(ws, "server")