RocketMQ
DolphinDB 的 RocketMQ 插件,支持发送数据消息到 RocketMQ 集群、从 RocketMQ 集群中的 Topic 中接收数据。
安装插件
版本要求
DolphinDB Server:2.00.10 及更高版本。支持 x64 的 Linux 版本。
安装步骤
在 DolphinDB 客户端中使用
listRemotePlugins
命令查看插件仓库中的插件信息。注意:仅展示当前操作系统和 server 版本支持的插件。若无预期插件,可自行编译(请自行选择对应分支下的插件)或在 DolphinDB 用户社区进行反馈。
login("admin", "123456") listRemotePlugins()
使用
installPlugin
命令完成插件安装。installPlugin("RocketMQ")
使用
loadPlugin
命令加载插件。loadPlugin("RocketMQ")
接口说明
createProducer
语法
createProducer(namesrv, groupName, [producerConfig])
详情
创建一个 RocketMQ 的 producer,可用于通过接口 send 发送数据到 RocketMQ。
一个 RocketMQ 的 producer 对象。
参数
namesrv STRING 类型标量,表示 RocketMQ 集群的 NameServer 的地址,形式为 "ip:port" 。
groupName STRING 类型标量,表示生产者组。
producerConfig 一个字典,类型为 (STRING, ANY),表示 producer 配置项,支持的配置项包括:
- sessionCredentials:STRING 类型的数组,代表用户凭证,元素分别为 accessKey,secretKey,accessChannel 。
- namesrvDomain:STRING 类型标量。当 RocketMQ 的客户端启动时,客户端会向指定的 NamesrvDomain 发送请求,以获取 Name Server 的地址列表。
- nameSpace:STRING 类型标量。通过使用 NameSpace,用户可以在一个 RocketMQ 集群中创建多个独立的逻辑分区,每个分区拥有独立的 Topic、Producer 和 Consumer。
- instanceName:STRING 类型标量,用于标识 RocketMQ 客户端实例的唯一名称。
- unitName:STRING 类型标量。用于标识 RocketMQ 的一个具体的业务单元的名称。
- sendMsgTimeout:INT 类型标量,表示单次发送的超时时间,单位是毫秒。必须大于 0,默认值为 3000。如果超过 *sendMsgTimeout *毫秒后仍失败,则再次发送,直到发送次数超过 retryTimes。
- retryTimes:INT 类型标量,表示发送重试次数。必须大于 0,默认值为 5 。
- compressMsgBodyOverHowmuch:INT 类型标量,用于指定消息体超过多大时启用压缩,单位是字节。必须大于 0,默认值为 4096。
- compressLevel:INT 类型标量,表示压缩等级。取值范围为 1-9,默认值为 4 。
- maxMessageSize:INT 类型标量,表示单条消息的最大长度,单位是字节。必须大于 0,默认值为 133632。
- tcpTransportConnectTimeout:INT 类型标量,表示连接超时时间,单位是毫秒。必须大于 0,默认值为 3000。
send
语法
send(producer, data, topic, [tag = ""])
详情
发送数据到 RocketMQ。
返回一个布尔值,true 表示发送成功。
参数
producer 通过 createProducer
创建的 producer 对象。
data STRING、BLOB 类型的标量或向量,表示待发送的数据。
topic STRING 类型标量,表示待发送数据的主题。
tag STRING 类型标量,表示消息的 tag 标签。默认值为空字符串。
示例
producer = createProducer("192.168.1.38:9876", "group1");
msg = "msg1"
send(msg)
msg2 = ["msg1", "msg2"]
send(msg2)
createSubJob
语法
createSubJob(namesrv, groupName, topic, handler, [subExpression = "*"], [consumerConfig])
详情
创建一个 RocketMQ 的订阅任务,用于后台接收 RocketMQ 的数据。
返回任务 ID,类型为 STRING。
参数
namesrv STRING 类型标量,表示 RocketMQ 集群的 NameServer 的地址,形式为 "ip:port" 。
groupName STRING 类型标量,表示消费者组。
topic STRING 类型标量,表示主题。
handler 一个一元函数对象,用于处理消息的函数句柄。接收的参数的数据结构是表。表中包含 BLOB 类型的 msg 列。
subExpression STRING 类型的标量,用于消息过滤,默认值为”*”。
consumerConfig 一个字典,类型为 (STRING, ANY),代表 consumer 配置项,支持的配置项包括:
- sessionCredentials:STRING 类型的数组,代表用户凭证,元素分别为 accessKey,secretKey,accessChannel 。
- messageModel:STRING 类型标量,表示消费模型:
- "CLUSTERING" :默认值,表示集群消费。
- "BROADCASTING" :表示广播消费。
- consumeFromWhere:STRING 类型标量,表示消费者从哪个位置开始消费数据:
- "CONSUME_FROM_LAST_OFFSET":默认值,表示第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费。
- "CONSUME_FROM_FIRST_OFFSET":第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费。
- namesrvDomain:STRING 类型标量。当 RocketMQ 的客户端启动时,客户端会向指定的 NamesrvDomain 发送请求,以获取 Name Server 的地址列表。
- instanceName:STRING 类型标量,用于标识 RocketMQ 客户端实例的唯一名称。
- unitName: STRING 类型标量。用于标识一个具体的业务单元的名称。
- nameSpace:STRING 类型标量,通过使用 nameSpace,用户可以在一个 RocketMQ 集群中创建多个独立的逻辑分区,每个分区拥有独立的 Topic、Producer 和 Consumer。
- consumeThreadCount:INT 类型标量,表示消费线程数。默认值为 1。
- pullMsgThreadPoolCount:INT 类型标量,表示拉取 RocketMQ 数据的线程数。必须大于 0,默认值为 1。
- maxReconsumeTimes:INT 类型标量,表示当消费消息失败时,RocketMQ 会尝试重新发送该消息最多次数。每次重试都会将消息放到队列的末尾,并等待其他消费者来消费。设置的值必须大于 0,不设置此参数时默认不会重试。
- tcpTransportConnectTimeout:INT 类型标量,表示连接超时时间,单位是毫秒。必须大于 0,默认值为 3000。
- asyncPull:BOOL 类型标量,表示在接收到数据后,是否异步执行 handler。默认值为 false,此时可以保证数据不丢失。
- batchSize:INT 类型标量,表示未处理消息的数量达到多少时,handler 才会处理消息。必须大于 0,默认值为 10000。仅在 *asyncPull *设置为 true 时生效。
- throttle:FLOAT 类型标量,表示继上次 handler 处理消息之后,若 batchSize 条件一直未达到,多久后再次处理消息,单位为毫秒。必须大于 0,默认值为 1。仅在 *asyncPull *设置为 true 时生效。
例子
def appendData(table1, data){
table1.append!(data)
}
table1 = table(1:0, ["msg"], [BLOB])
RocketMQ::createSubJob("192.168.1.38:9876", "group1", ”topic1", appendData{table1})
cancelSubJob
语法
cancelSubJob(JobId)
详情
取消后台订阅 RocketMQ 的任务。
返回一个布尔值,如果取消成功会返回 true。
参数
JobId STRING 类型标量,通过 createSubJob
或者是 getSubJobStat
返回的订阅任务 ID。
getSubJobStat
语法
getSubJobStat()
详情
查询当前 RocketMQ 后台订阅任务的信息。
返回一个表,包含如下列:
- jobID:列类型为 STRING ,表示订阅 ID。
- startTime:列类型为 NANOTIMESTAMP,表示任务创建时间。
- endTime:列类型为 NANOTIMESTAMP,表示任务结束时间。任务可以通过
cancelSubJob
来取消。 - firstMsgTime:列类型为 NANOTIMESTAMP,表示第一条数据的接收时间。
- lastMsgTime:列类型为 NANOTIMESTAMP,表示上一条消息的接收时间。
- processedMsgCount:列类型为 LONG,表示成功处理的消息行数。
- failedMsgCount:列类型为 LONG,表示处理失败的消息行数。
- lastErrMsg:列类型为 STRING,表示上一次处理失败的错误信息。
- lastFailedTimestamp:列类型为 NANOTIMESTAMP,表示上一次处理失败的时间。
参数
无