RocketMQ

DolphinDB 的 RocketMQ 插件,支持发送数据消息到 RocketMQ 集群、从 RocketMQ 集群中的 Topic 中接收数据。

在插件市场安装插件

版本要求

  • DolphinDB Server: 2.00.9 及更高版本。
  • OS: 64 位 Linux

安装步骤

  1. 在 DolphinDB 客户端中使用函数 listRemotePlugins 查看插件仓库中的插件信息。
login("admin", "123456")
listRemotePlugins()
  1. 使用函数 installPlugin 完成插件安装。
installPlugin("RocketMQ")
  1. 使用 loadPlugin 命令加载插件(即上一步返回的.txt 文件)。
loadPlugin("RocketMQ")

接口说明

RocketMQ::createProducer

详情

创建一个 RocketMQ 的 producer,可用于通过接口 RocketMQ::send 发送数据到 RocketMQ。

语法

RocketMQ::createProducer(namesrv, groupName, [producerConfig])

参数

  • namesrv: STRING 类型的标量,表示 RocketMQ 集群的 NameServer 的地址,形式为 <ip:port> 。

  • groupName: STRING 类型的标量,表示生产者组。

  • producerConfig: 类型为 dict(string, any),表示 producer 配置项,支持的配置项包括:

  • sessionCredentials: 用户凭证,需要输入一个 STRING 类型的数组,分别为 accessKey,secretKey,accessChannel。

  • namesrvDomain: STRING 类型的标量。当 RocketMQ 的客户端启动时,客户端会向指定的 NamesrvDomain 发送请求,以获取 Name Server 的地址列表。

  • nameSpace: STRING 类型的标量。通过使用 NameSpace,用户可以在一个 RocketMQ 集群中创建多个独立的逻辑分区,每个分区拥有独立的 Topic、Producer 和 Consumer。

  • instanceName: STRING 类型的标量,用于标识 RocketMQ 客户端实例的唯一名称。

  • unitName: STRING 类型的标量。用于标识 RocketMQ 的一个具体的业务单元的名称。

  • sendMsgTimeout: INT 类型标量,表示单次发送超时时间,单位是毫秒。必须大于 0,默认值为 3000。如果超过 sendMsgTimeout 毫秒后仍失败,则再次发送,直到发送次数超过 retryTimes。

  • retryTimes: INT 类型标量,表示发送重试次数。必须大于 0,默认值为 5。

  • compressMsgBodyOverHowmuch: INT 类型标量,用于指定消息体超过多大时启用压缩。必须大于 0,默认值为 4096,单位是字节。

  • compressLevel: INT 标量,表示压缩等级。范围为 1-9,默认值为 4。

  • maxMessageSize: INT 标量,表示单条消息的最大长度。必须大于 0,默认值为 133632,单位是字节。

  • tcpTransportConnectTimeout: INT 标量,表示连接超时时间。单位是毫秒,必须大于 0,默认值为 3000。

返回值

RocketMQ 的 producer 对象。

RocketMQ::send

详情

发送数据到 RocketMQ。

语法

RocketMQ::send(producer, data, topic, [tag = ""])

参数

  • producer: 通过 ·RocketMQ::createProducer· 创建的 producer 对象。

  • data: STRING、BLOB 类型的标量或向量,表示待发送的数据。

  • topic: STRING 类型的标量,表示待发送数据的主题。

  • tag: STRING 类型的标量,表示消息的 tag 标签。默认值为空字符串。

返回值

发送成功会返回 true。

示例

producer = RocketMQ::createProducer("192.168.1.38:9876", "group1");
msg = "msg1"
RocketMQ::send(msg)
msg2 = ["msg1", "msg2"]
RocketMQ::send(msg2)

RocketMQ::createSubJob

详情

创建一个 RocketMQ 的订阅任务,用于后台接收 RocketMQ 的数据。

语法

RocketMQ::createSubJob(namesrv, groupName, topic, handler, [subExpression = "*"], [consumerConfig])

参数

  • namesrv: STRING 类型的标量,表示 RocketMQ 集群的 NameServer 的地址,形式为 <ip:port> 。

  • groupName: STRING 类型的标量,表示消费者组。

  • topic: STRING 类型的标量,表示主题。

  • handler: 处理消息的函数句柄。是一个一元函数,接收的参数的数据结构是表。表中包含 BLOB 类型的 msg 列。

  • subExpression: STRING 类型的标量,用于消息过滤,默认值为”*”。

  • consumerConfig: consumer 配置项,类型为 dict(string, any),支持的配置项包括:

  • sessionCredentials: 用户凭证,需要输入一个 STRING 类型的数组,分别为 accessKey,secretKey,accessChannel。

  • messageModel: 两种消息模型分别为集群消费和广播消费。对应为”BROADCASTING“,”CLUSTERING“。默认为”CLUSTERING“。

  • consumeFromWhere: 消费者从哪个位置开始消费数据。目前支持”CONSUME_FROM_LAST_OFFSET”,”CONSUME_FROM_FIRST_OFFSET"。默认为”CONSUME_FROM_LAST_OFFSET"。

  • namesrvDomain: STRING 类型的标量。当 RocketMQ 的客户端启动时,客户端会向指定的 NamesrvDomain 发送请求,以获取 Name Server 的地址列表。

  • instanceName: STRING 类型的标量,用于标识 RocketMQ 客户端实例的唯一名称。

  • unitName: 类型为一个 STRING 标量,用于标识一个具体的业务单元的名称。

  • nameSpace: STRING 类型的标量。通过使用 NameSpace,用户可以在一个 RocketMQ 集群中创建多个独立的逻辑分区,每个分区拥有独立的 Topic、Producer 和 Consumer。

  • consumeThreadCount: 消费线程数。默认为 1。

  • pullMsgThreadPoolCount: 拉取 RocketMQ 数据的线程数。必须大于 0,默认为 1。

  • maxReconsumeTimes: INT 类型标量,表示当消费消息失败时,RocketMQ 会尝试重新发送该消息最多次数。每次重试都会将消息放到队列的末尾,并等待其他消费者来消费。设置的值必须大于 0,默认值为不会重试。

  • tcpTransportConnectTimeout: INT 类型标量,表示连接超时时间,单位是毫秒,必须大于 0,默认值为 3000。

  • asyncPull: 在接收到数据后,是否异步执行 handler。设置为 false 时,可以保证数据不丢失,默认为 false。

  • batchSize: 累计的消息数量超过 batchSize 后执行 handler。必须大于 0,默认值为 10000,在 asyncPull 配置为 true 时生效。

  • throttle: FLOAT 类型的标量,累计接收数据超过 throttle 时后执行 handler。必须大于 0,默认值为 1,单位为毫秒,在 asyncPull 配置为 true 时生效。

返回值

任务 ID,类型为 STRING 标量。

例子

def appendData(table1, data){
  table1.append!(data)
}
table1 = table(1:0, ["msg"], [BLOB])
RocketMQ::createSubJob("192.168.1.38:9876", "group1", ”topic1", appendData{table1})

RocketMQ::cancelSubJob

详情

取消后台订阅 RocketMQ 的任务。

语法

RocketMQ::cancelSubJob(JobId)

参数

  • JobId: STRING 类型的标量。通过 RocketMQ::createSubJob 或者是 RocketMQ::getSubJobStat 返回的订阅任务 ID。

返回值

取消成功会返回 true。

RocketMQ::getSubJobStat

详情

查询当前 RocketMQ 后台订阅任务的信息。

语法

RocketMQ::getSubJobStat()

返回值

返回一个表,包含如下列:

  • jobID: 订阅 ID。STRING 类型。

  • startTime: 任务创建时间。NANOTIMESTAMP 类型。

  • endTime: 任务取消时间。NANOTIMESTAMP 类型。任务可以通过 TCPSocket::cancelSubJob 来取消。

  • firstMsgTime: 第一条数据的接收时间。NANOTIMESTAMP 类型。

  • lastMsgTime: 上一条消息的接收时间。NANOTIMESTAMP 类型。

  • processedMsgCount: 成功处理的消息行数。LONG 类型。

  • failedMsgCount: 处理失败的消息行数。LONG 类型。

  • lastErrMsg: 上一次处理失败的错误信息。STRING 类型。

  • lastFailedTimestamp: 上一次处理失败的时间。NANOTIMESTAMP 类型。