TCPSocket

通过 DolphinDB 的 TCPSocket 插件,用户可以通过 TCP 连接到指定的 IP 和端口,然后遵循自定义格式的传输报文进行数据的接收和发送。

安装插件

版本要求

DolphinDB Server 2.00.10 及更高版本,支持 Linux x64, Linux ARM, Linux JIT。

安装步骤

  1. 在 DolphinDB 客户端中使用 listRemotePlugins 命令查看插件仓库中的插件信息。

    注意:仅展示当前操作系统和 server 版本支持的插件。若无预期插件,可自行编译(请选择对应分支下的插件)或在 DolphinDB 用户社区进行反馈。

    login("admin", "123456")
    listRemotePlugins()
  2. 使用 installPlugin 命令完成插件安装。

    installPlugin("TCPSocket")
  3. 使用 loadPlugin 命令加载插件。

    loadPlugin("TCPSocket")

接口说明

createSubJob

语法

TCPSocket::createSubJob(ip, port, handler, [config])

详情

创建一个 TCP 连接,连接指定的 IP 和 port,并创建一个后台任务,接收 TCP 的数据。返回一个 STRING 的标量,可用于后续 TCPSocket::cancelSubJob 使用。

注意: TCPSocket::createSubJob 创建的 job 不会随着 session 关闭自动退出,必须手动调用 TCPSocket::cancelSubJob 结束。

参数

ip STRING 类型标量,表示需要连接的服务端的 IP 地址。

port INT 类型的标量,表示需要连接的服务端的端口。

handler 一个一元函数对象,用于解析从 TCP 的连接里接收到的数据。 handler 需要的函数签名为:

  •   def hdndler(data){
      }

    其中输入的 data 是一个表,包含如下列:

    • bytes:列类型为 BLOB,接收到的 TCP 数据。最大长度为 10240 字节。
    • isHead:列类型为 BOOL,表示该行所接的 TCP 数据是否和上一行是连续的,每次 TCP 连接后,第一行数据的 isHead 为 true。
  • config:一个字典,类型为 (STRING, ANY),用于配置特定的参数。目前仅支持 “maxRows*”*。

    • maxRows:INT 类型的标量,表示每次执行 parser 时的最多的数据行数。默认为 128;最小值为1。

getSubJobStat

语法

TCPSocket::getSubJobStat()

详情

获取所有的 TCP 后台任务信息,包含已经通过 TCPSocket::cancelSubJob 取消的任务。

返回一个表,包含如下列:

  • tag:列类型为 STRING,表示 TCP 的订阅任务标识。
  • startTime:列类型为 NANOTIMESTAMP,表示任务创建时间。
  • endTime:列类型为 NANOTIMESTAMP,表示任务取消时间。
  • firstMsgTime:列类型为 NANOTIMESTAMP,表示第一条数据的接收时间。
  • lastMsgTime:列类型为 NANOTIMESTAMP, 表示上一条消息的接收时间。
  • processedMsgCount:列类型为 LONG,表示成功处理的消息行数。
  • failedMsgCount:列类型为 LONG ,表示处理失败的消息行数。
  • lastErrMsg:列类型为 STRING,表示上一次处理失败(TCP 连接失败、TCP 连接断开、读取 TCP 数据失败等情况)的错误信息。
  • lastFailedTimestamp:列类型为 NANOTIMESTAMP ,表示上一次处理失败(同上)的时间。

cancelSubJob

语法

TCPSocket::cancelSubJob(tag)

详情

取消一个 TCP 的后台任务。

参数

tag 类型为 STRING 标量,表示TCP 任务标识。可以通过 TCPSocket::getSubJobStat () 查询获得或者 TCPSocket::createSubJob 函数返回获得。

connect

语法

TCPSocket::connect(ip, port)

详情

创建一个阻塞模式的 TCP 连接。TCPSocket::connect 创建的连接。返回一个 TCP 的 socket 资源,可用于后续在 TCPSocket::readTCPSocket::close 中使用。

注意:在当前 session 关闭时,插件会自动调用 TCPSocket::close 以关闭连接。

参数

ip STRING 类型的标量,表示连接 TCP socket 服务端的 IP 地址。

port INT 类型的标量,表示连接 TCP socket 服务端的端口。

read

语法

TCPSocket::read(socket, [size])

详情

读取 TCP 的 socket 里的数据。返回一个 BLOB 类型的标量。

参数

socket 通过 TCPSocket::connect 创建的 socket 连接。

size 类型为 INT 的标量,表示读取的最大字节数。默认是 10240 字节数;必须大于0。

write

语法

TCPSocket::write(socket, data)

详情

向 TCP 的 socket 写数据。如果写入成功,会返回 true。如果写入失败,会抛出异常。

参数

socket 通过 TCPSocket::connect 创建的 socket 连接。

data BLOB 或 STRING 类型的标量,需要往 socket 写的数据。

close

语法

TCPSocket::close(socket)

详情

关闭一个 TCP 的 socket 连接。返回 true。

参数

socket 通过 TCPSocket::connect 创建的 socket 连接。

使用示例

def handler(mutable table1, data){
	table1.append!(data)
}
share table(1:0, `bytes`isHead, [BLOB, BOOL]) as t
config = dict(STRING, ANY)
config[`maxRows] = 8192
	
TCPSocket::createSubJob("192.168.1.38", 20002, handler{t}, config)

附录

IO_ERROR 错误码以及出错原因

如果在执行插件接口后返回或者在 log 里有如下异常信息:[PLUGIN::TCPSocket]: failed to XXX with IO error type $。建议查看下表以排查具体原因。

IO_ERROR 值原因
1Socket is disconnected/closed or file is closed.
3Out of memory, no disk space, or no buffer.
7Reach the end of a file or a buffer.
8File is readable but not writable.
9File is writable but not readable.
10A file doesn't exist or the socket destination is not reachable.
13Unknown IO error.