RocketMQ
DolphinDB 的 RocketMQ 插件,支持发送数据消息到 RocketMQ 集群、从 RocketMQ 集群中的 Topic 中接收数据。
在插件市场安装插件
版本要求
- DolphinDB Server: 2.00.9 及更高版本。
- OS: 64 位 Linux
安装步骤
- 在 DolphinDB 客户端中使用函数
listRemotePlugins
查看插件仓库中的插件信息。
login("admin", "123456")
listRemotePlugins()
- 使用函数
installPlugin
完成插件安装。
installPlugin("RocketMQ")
- 使用
loadPlugin
命令加载插件(即上一步返回的.txt 文件)。
loadPlugin("RocketMQ")
接口说明
RocketMQ::createProducer
详情
创建一个 RocketMQ 的 producer,可用于通过接口 RocketMQ::send 发送数据到 RocketMQ。
语法
RocketMQ::createProducer(namesrv, groupName, [producerConfig])
参数
namesrv: STRING 类型的标量,表示 RocketMQ 集群的 NameServer 的地址,形式为 <ip:port> 。
groupName: STRING 类型的标量,表示生产者组。
producerConfig: 类型为 dict(string, any),表示 producer 配置项,支持的配置项包括:
sessionCredentials: 用户凭证,需要输入一个 STRING 类型的数组,分别为 accessKey,secretKey,accessChannel。
namesrvDomain: STRING 类型的标量。当 RocketMQ 的客户端启动时,客户端会向指定的 NamesrvDomain 发送请求,以获取 Name Server 的地址列表。
nameSpace: STRING 类型的标量。通过使用 NameSpace,用户可以在一个 RocketMQ 集群中创建多个独立的逻辑分区,每个分区拥有独立的 Topic、Producer 和 Consumer。
instanceName: STRING 类型的标量,用于标识 RocketMQ 客户端实例的唯一名称。
unitName: STRING 类型的标量。用于标识 RocketMQ 的一个具体的业务单元的名称。
sendMsgTimeout: INT 类型标量,表示单次发送超时时间,单位是毫秒。必须大于 0,默认值为 3000。如果超过 sendMsgTimeout 毫秒后仍失败,则再次发送,直到发送次数超过 retryTimes。
retryTimes: INT 类型标量,表示发送重试次数。必须大于 0,默认值为 5。
compressMsgBodyOverHowmuch: INT 类型标量,用于指定消息体超过多大时启用压缩。必须大于 0,默认值为 4096,单位是字节。
compressLevel: INT 标量,表示压缩等级。范围为 1-9,默认值为 4。
maxMessageSize: INT 标量,表示单条消息的最大长度。必须大于 0,默认值为 133632,单位是字节。
tcpTransportConnectTimeout: INT 标量,表示连接超时时间。单位是毫秒,必须大于 0,默认值为 3000。
返回值
RocketMQ 的 producer 对象。
RocketMQ::send
详情
发送数据到 RocketMQ。
语法
RocketMQ::send(producer, data, topic, [tag = ""])
参数
producer: 通过 ·RocketMQ::createProducer· 创建的 producer 对象。
data: STRING、BLOB 类型的标量或向量,表示待发送的数据。
topic: STRING 类型的标量,表示待发送数据的主题。
tag: STRING 类型的标量,表示消息的 tag 标签。默认值为空字符串。
返回值
发送成功会返回 true。
示例
producer = RocketMQ::createProducer("192.168.1.38:9876", "group1");
msg = "msg1"
RocketMQ::send(msg)
msg2 = ["msg1", "msg2"]
RocketMQ::send(msg2)
RocketMQ::createSubJob
详情
创建一个 RocketMQ 的订阅任务,用于后台接收 RocketMQ 的数据。
语法
RocketMQ::createSubJob(namesrv, groupName, topic, handler, [subExpression = "*"], [consumerConfig])
参数
namesrv: STRING 类型的标量,表示 RocketMQ 集群的 NameServer 的地址,形式为 <ip:port> 。
groupName: STRING 类型的标量,表示消费者组。
topic: STRING 类型的标量,表示主题。
handler: 处理消息的函数句柄。是一个一元函数,接收的参数的数据结构是表。表中包含 BLOB 类型的 msg 列。
subExpression: STRING 类型的标量,用于消息过滤,默认值为”*”。
consumerConfig: consumer 配置项,类型为 dict(string, any),支持的配置项包括:
sessionCredentials: 用户凭证,需要输入一个 STRING 类型的数组,分别为 accessKey,secretKey,accessChannel。
messageModel: 两种消息模型分别为集群消费和广播消费。对应为”BROADCASTING“,”CLUSTERING“。默认为”CLUSTERING“。
consumeFromWhere: 消费者从哪个位置开始消费数据。目前支持”CONSUME_FROM_LAST_OFFSET”,”CONSUME_FROM_FIRST_OFFSET"。默认为”CONSUME_FROM_LAST_OFFSET"。
namesrvDomain: STRING 类型的标量。当 RocketMQ 的客户端启动时,客户端会向指定的 NamesrvDomain 发送请求,以获取 Name Server 的地址列表。
instanceName: STRING 类型的标量,用于标识 RocketMQ 客户端实例的唯一名称。
unitName: 类型为一个 STRING 标量,用于标识一个具体的业务单元的名称。
nameSpace: STRING 类型的标量。通过使用 NameSpace,用户可以在一个 RocketMQ 集群中创建多个独立的逻辑分区,每个分区拥有独立的 Topic、Producer 和 Consumer。
consumeThreadCount: 消费线程数。默认为 1。
pullMsgThreadPoolCount: 拉取 RocketMQ 数据的线程数。必须大于 0,默认为 1。
maxReconsumeTimes: INT 类型标量,表示当消费消息失败时,RocketMQ 会尝试重新发送该消息最多次数。每次重试都会将消息放到队列的末尾,并等待其他消费者来消费。设置的值必须大于 0,默认值为不会重试。
tcpTransportConnectTimeout: INT 类型标量,表示连接超时时间,单位是毫秒,必须大于 0,默认值为 3000。
asyncPull: 在接收到数据后,是否异步执行 handler。设置为 false 时,可以保证数据不丢失,默认为 false。
batchSize: 累计的消息数量超过 batchSize 后执行 handler。必须大于 0,默认值为 10000,在 asyncPull 配置为 true 时生效。
throttle: FLOAT 类型的标量,累计接收数据超过 throttle 时后执行 handler。必须大于 0,默认值为 1,单位为毫秒,在 asyncPull 配置为 true 时生效。
返回值
任务 ID,类型为 STRING 标量。
例子
def appendData(table1, data){
table1.append!(data)
}
table1 = table(1:0, ["msg"], [BLOB])
RocketMQ::createSubJob("192.168.1.38:9876", "group1", ”topic1", appendData{table1})
RocketMQ::cancelSubJob
详情
取消后台订阅 RocketMQ 的任务。
语法
RocketMQ::cancelSubJob(JobId)
参数
- JobId: STRING 类型的标量。通过
RocketMQ::createSubJob
或者是RocketMQ::getSubJobStat
返回的订阅任务 ID。
返回值
取消成功会返回 true。
RocketMQ::getSubJobStat
详情
查询当前 RocketMQ 后台订阅任务的信息。
语法
RocketMQ::getSubJobStat()
返回值
返回一个表,包含如下列:
jobID: 订阅 ID。STRING 类型。
startTime: 任务创建时间。NANOTIMESTAMP 类型。
endTime: 任务取消时间。NANOTIMESTAMP 类型。任务可以通过 TCPSocket::cancelSubJob 来取消。
firstMsgTime: 第一条数据的接收时间。NANOTIMESTAMP 类型。
lastMsgTime: 上一条消息的接收时间。NANOTIMESTAMP 类型。
processedMsgCount: 成功处理的消息行数。LONG 类型。
failedMsgCount: 处理失败的消息行数。LONG 类型。
lastErrMsg: 上一次处理失败的错误信息。STRING 类型。
lastFailedTimestamp: 上一次处理失败的时间。NANOTIMESTAMP 类型。