udpServer

针对通过 UDP 协议发送、接收数据的应用场景,DolphinDB 提供了 udpServer 插件。插件基于 libevent 开源库 开发,可以作为 UDP 服务端监听 IP 地址、端口,并接收 UDP 客户端发送的数据到 DolphinDB Server 中;也可以作为 UDP 客户端,发送 UDP 数据到指定的 IP 地址、端口。

安装插件

版本要求

DolphinDB Server 2.00.16 版本,支持 Linux x86-64。

安装步骤

  1. 在 DolphinDB 客户端中调用 listRemotePlugins 函数查看插件仓库中的插件信息。

    注意:返回结果中仅展示当前操作系统和 server 版本支持的插件。若无预期插件,可在 DolphinDB 用户社区进行反馈。

    login("admin", "123456")
    listRemotePlugins()
  2. 调用 installPlugin 函数安装插件。

    installPlugin("udpServer")
  3. 调用 loadPlugin 函数加载插件。

    login("admin", "123456")
    loadPlugin("udpServer")

接口介绍

bind

语法

udpServer::bind(host, port, bufferSize)

详情

将 IP 地址及端口号绑定到 socket 上,返回一个句柄 ID。如果创建 socket 或绑定端口失败,则会抛出异常。如果 udpServer 接收的 UDP 包大小超过 bufferSize 设置值,则只读取 UDP 包前 bufferSize 字节的内容。

参数

host 是字符串标量,表示 udpServer 监听的 IP 地址。

port 是正整数标量,表示 udpServer 监听的端口号。

bufferSize 是正整数标量,表示该监听端口所能接收的 UDP 包大小的上限,单位为字节。客户端发送的 UDP 包大小应该不超过 bufferSize 设置值。bufferSize 的上限为 65535,所有小于 64 的 bufferSize 都会被设置为 64。

返回值

返回一个句柄 ID。

注意

  1. 如果调用 bind 接口时增加 bufferSize,在保证最高无丢包率的情况下,发送速度会降低,因此建议设置 bufferSize 为 1024 字节。

  2. bufferSize = 1024 时,socket 接收的发送包速度与丢包率关系如下:

    接收的发送包速度(万个/秒) 丢包率
    <10 0
    >=10 且 <20 <1/1000
    >=20 且 <50 约为50%

由上表可以看出,socket 接收的发送包速度越大,丢包率也越高。因此,不建议客户端每秒发送 50 万个以上 UDP 数据包到单个 socket。尽可能使单个 UDP 包的数据量达到 bufferSize,以降低发送的频率。

subscribe

语法

udpServer::subscribe(socket, topic, parser, handler)

详情

订阅并处理 udpServer 收到的数据。同一个 socket 可以同时创建多个订阅主题,订阅同一份数据。

参数

socketbind 函数返回的句柄 ID。

topic 是字符串标量,表示订阅的主题。

parser 是一元函数,它唯一的参数是一个字符串标量,返回值是一个表。

handler 是一个表,用来处理经 parser 解析后的数据。

注意

  • 若同一个 socket 同时创建多个订阅主题,订阅同一份数据,则 udpServer 会多次拷贝订阅的数据,进而影响性能。因此请尽量避免对接收到的数据量较大的数据进行多次订阅。

  • 若重新定义 handler,需要调用 unsubscribe 接口取消之前的订阅,再调用 subscribe 接口重新订阅。

  • 如果多个订阅需要输出到同一张表中,需要先调用 share 函数共享表,否则会导致 server crash。

unsubscribe

语法

udpServer::unsubscribe(socket, topic)

详情

取消订阅指定主题的数据流。

参数

socketbind 函数返回的句柄 ID。

topic 是一个字符串标量,表示订阅的主题。

unbind

语法

udpServer::unbind(socket)

详情

解除指定 socket 上绑定的 IP 地址和端口号。

参数

socketbind 函数返回的句柄 ID。

createPusher

语法

udpServer::createPusher(host, port, formatter, dummyTable)

详情

创建一个 UDP 的 pusher,注入该 pusher 的数据将被推送出去。支持两种用法:

  • 调用 append! 函数追加数据至 pusher。

  • 将流数据引擎输出表(outputTable)数据注入 pusher。

参数

host 是字符串标量,表示 UDP 的远端 IP 地址。

port 是正整数标量,表示 UDP 的远端端口。

formatter 是一个函数,用于指定待发布数据的打包格式。入参只有一个,形式为是 Table,返回值是一个 STRING 或 BLOB 类型的 Scalar 或 Vector。

dummyTable 是一个表对象,用于接收注入的数据。

例子

share streamTable(1000:0, `time`sym`volume, [TIMESTAMP, SYMBOL, INT]) as trades
output1 = table(10000:0, `time`sym`sumVolume, [TIMESTAMP, SYMBOL, INT])
pythonHost = "192.xxx.x.xx" // 更改为实际的 host
pythonPort = 1xxxx // 更改为实际的 port
pusher = udpServer::createPusher(pythonHost, pythonPort, toStdJson, output1)
engine1 = createTimeSeriesEngine(name="engine1", windowSize=60000, step=60000, metrics=<[sum(volume)]>, dummyTable=trades, outputTable=pusher, timeColumn=`time, useSystemTime=false, keyColumn=`sym, garbageSize=50, useWindowStartTime=false)
subscribeTable(tableName="trades", actionName="engine1", offset=0, handler=append!{engine1}, msgAsTable=true);
insert into trades values(2018.10.08T01:01:01.785,`A,10)
insert into trades values(2018.10.08T01:01:02.125,`B,26)
insert into trades values(2018.10.08T01:01:10.263,`B,14)
insert into trades values(2018.10.08T01:01:12.457,`A,28)
insert into trades values(2018.10.08T01:02:10.789,`A,15)
insert into trades values(2018.10.08T01:02:12.005,`B,9)
insert into trades values(2018.10.08T01:02:30.021,`A,10)
insert into trades values(2018.10.08T01:04:02.236,`A,29)
insert into trades values(2018.10.08T01:04:04.412,`B,32)

getSubscriptionInfo

语法

udpServer::getSubscriptionInfo()

详情

获取所有 socket 下的订阅主题及其创建时间。

getSocketInfo

语法

udpServer::getSocketInfo()

详情

获取当前 udpServer 绑定的所有 socket。

完整示例

客户端通过 UDP 协议发送数据至 127.x.x.x::30xxx。此处的 IP 地址及端口号为示例,实际使用时请指定真实的地址及端口。DolphinDB 通过 udpServer 插件接收数据,并进行处理。

// 加载插件
loadPlugin("udpServer")

// 绑定一个 socket, 设置 bufferSize 大小为 1024
// 实际使用时请指定真实的地址及端口
socket = udpServer::bind("127.x.x.x", 30xxx, 1024)

// 第一个订阅,将订阅的数据解析后存入持久化流表 test01 中
def f1(arg){
	return table([arg] as col1)
}
handler_1 = streamTable(10:0, [`wave], [BLOB])
enableTableShareAndPersistence(table=handler_1, tableName=`test01, asynWrite=true, compress=true,cacheSize=10000000, retentionMinutes=120)
udpServer::subscribe(socket, "subscribetest1", f1, handler_1)

// 第二个订阅,将订阅的数据解析后存入持久化流表 test02 中
def f2(arg){
	return table([arg] as col1)
}
handler_2 = streamTable(10:0, [`wave], [BLOB])
enableTableShareAndPersistence(table=handler_2, tableName=`test02, asynWrite=true, compress=true,cacheSize=10000000, retentionMinutes=120)
udpServer::subscribe(socket, "subscribetest2", f2, handler_2)

// 数据查询
select * from test01
select * from test02

// 状态查询
print(udpServer::getSubscriptionInfo())
print(udpServer::getSocketInfo())

// 取消订阅,取消绑定
udpServer::unsubscribe(socket, "subscribetest1")
udpServer::unsubscribe(socket, "subscribetest2")
udpServer::unbind(socket)