RabbitMQ

在插件市场安装插件

版本要求

  • DolphinDB Server: 2.00.10及更高版本
  • OS: 64位 Linux

安装步骤

  1. 在DolphinDB 客户端中使用 listRemotePlugins 命令查看插件仓库中的插件信息。

    login("admin", "123456")
    listRemotePlugins()
  2. 使用 installPlugin 命令完成插件安装。

    installPlugin("rabbitmq")
  3. 使用 loadPlugin 命令加载插件。

    loadPlugin("rabbitmq")

用户接口

  • connection, connectionAMQP 在连接建立失败时会抛出异常

  • 如果某个传入channel的函数调用失败,channel就会失效。所以推荐在调用不同函数时创建新的channel。但是不推荐每次publish时创建新的channel。

  • channel的成员函数除publish以及consume外,其他函数都是同步函数,如果执行失败,会抛出异常,这可能是因为参数不正确(IllegalArgumentException),可能是因为函数本身运行失败(RuntimeException),也可能是因为connection或channel失效(RuntimeException)。如果是因为后两个原因,此时需要重新创建channel。

    • 在close(channel)之后调用channel的成员函数会抛出异常,提示“Please create channel first”

    • 在close(conneciton)之后调用channel的成员函数会抛出异常,提示“failed to ..., refer to log for more information”,并且日志提示“Frame could not be sent”

    • 在connection断开导致channel失效或channel的成员函数运行失败使channel失效之后再调用channel的成员函数会抛出异常,提示“failed to ..., refer to log for more information”,并且日志提示“Frame could not be sent”

    • 创建channel的过程中如果因为connection已经被关闭或者连接数达到上限,会抛出异常

    • 综上:channel失效之后需要创建新的channel,如果创建channel的过程中如果抛出异常,此时需要重新创建connection。

  • channel的成员函数publish如果失败会抛出异常。可以通过捕获异常来重新建立connection和channel并重新发送。

  • channel的成员函数consume本身是同步的,可以知道订阅是否成功。但是consume在后续调用处理消息的回调函数中可能会因为connection或channel失效而失败,此时没办法通知用户发生了错误。

  • 一个channel上不能进行并发操作(也有可能是一个connection上)。如果需要多线程调用,应该创建每个线程的channel(也有可能是connection)。

connection

语法

connection(host, [port=5672], [username='guest'], [password='guest'], [vhost="/"])

参数

host:主机,字符串标量

port:端口号,数值类型,默认为5672

username:用户名,字符串标量,默认为guest

password:密码,字符串标量,默认为guest

vhost:虚拟主机,字符串标量,默认为"/"

详情

使用给定信息连接RabbitMQ,返回表示连接的对象。

例子

conn = rabbitmq::connection('localhost', 5672, 'guest', 'guest', "/")

connectionAMQP

语法

connectionAMQP(amqpURIs)

参数

amqpURI:AMQP协议地址,字符串向量

详情

使用AMQP地址连接RabbitMQ,返回表示连接的对象。

例子

conn = rabbitmq::connectionAMQP(['amqp://guest:guest@192.168.0.1:5672', 'amqp://guest:guest@192.168.0.2:5672', 'amqp://guest:guest@192.168.0.3:5672'])

channel

语法

channel(connection)

参数

connection:RabbitMQ连接

详情

使用指定连接创建Channel,返回表示Channel的对象

例子

ch = rabbitmq::channel(conn)

declareExchange

语法

declareExchange(channel, name, [type='fanout'], [flags])

参数

channel:表示Channel的对象

name:字符串标量

type:字符串标量,支持的取值:fanoutdirecttopicheadersconsistent_hash,默认为fanout

flags:字符串向量,可为空。支持的取值:durableautodeletepassiveinternal

详情

声明Exchange

例子

rabbitmq::declareExchange(ch, 'test-exchange', 'fanout', ['autodelete'])

bindExchange

语法

bindExchange(channel, source, target, routingkey)

参数

channel:通道

source:source Exchange名称

target:target Exchange名称

routingkey:路由规则名称

详情

绑定Exchange

例子

rabbitmq::bindExchange(ch, 'test-exchange1', 'test-exchange2', 'rule1')

unbindExchange

语法

bindExchange(channel, source, target, routingkey)

参数

同 bindExchange

详情

解绑Exchange

例子

rabbitmq::unbindExchange(ch, 'test-exchange1', 'test-exchange2', 'rule1')

removeExchange

语法

removeExchange(channel, name, [flags])

参数

channel:通道

name:Exchange名称

flags:字符串向量,可为空。支持的取值:ifunused

详情

移除Exchange

例子

rabbitmq::removeExchange(ch, 'test-exchange')

declareQueue

语法

declareQueue(channel, name, [flags], [arguments])

参数

channel:表示Channel的对象

name:声明的队列名,字符串标量

flags:队列标志,字符串向量,可为空。支持的取值:durableautodeletepassiveexclusive

arguments:队列参数,字典,可为空。键为字符串,值支持的类型:STRINGCHARLONGINTBOOL

详情

声明Queue

例子

arguments = dict(STRING, ANY)
arguments['x-max-length'] = 10000
arguments['x-overflow'] = 'drop-head'
arguments['x-queue-type'] = 'quorum'
rabbitmq::declareQueue(ch, 'test', ['exclusive'], arguments)

bindQueue

语法

bindQueue(channel, exchange, queue, routingkey)

参数

channel:通道

exchange:source Exchange名称

queue:target Queue名称

routingkey:路由规则名称

详情

绑定Queue

例子

rabbitmq::bindQueue(ch, 'test-exchange1', 'test-queue1', 'rule1')

unbindQueue

语法

unbindQueue(channel, exchange, queue, routingkey)

参数

同bindQueue

详情

解绑Queue

例子

rabbitmq::unbindQueue(ch, 'test-exchange1', 'test-queue', 'rule1')

removeQueue

语法

removeQueue(channel, name, [flags])

参数

channel:通道

name:Queue名称

flags:可选

详情

移除Queue

例子

rabbitmq::removeQueue(ch, 'test-queue1')

publish

语法

publish(channel, exchange, routingKey, message, format='default', [flags])

参数

channel:表示Channel的对象

exchange:字符串标量

routingKey:字符串标量

message:要发送的信息

format:指定信息格式,字符串标量或函数(由createJSONFormatter或createCSVFormatter创建)

format取值
defaultDolphinDB默认格式
bytestream字节流
protobufprotocol buffer

flags:字符串向量,可为空。支持的取值:mandatoryimmediate

详情

以指定方式发布DolphinDB对象

例子

rabbitmq::publish(ch, 'test-exchange1', 'rule1', 'Hello World1', 'bytestream')

consume

语法

consume(channel, queue, handle, parser, tag, [flags])

参数

channel:表示Channel的对象

queue:订阅的队列名称,字符串标量

handle:一元函数,用于处理订阅的数据。请注意,传入的一元函数中不能存在对DFS表的操作,例如:读取或写入DFS表,获取DFS表的schema等。

parser:订阅数据的格式,字符串标量,取值范围同publish,或者是解析函数(由createJSONParser或createCSVParser创建)

tag:消费者标识,字符串标量

flags:字符串向量,可为空。支持的取值:nolocalnoackexclusive

详情

订阅队列

例子

def handle(msg){
}
rabbitmq::consume(ch, 'test-queue1', handle, 'bytestream', 'consumer1')

cancelConsume

语法

cancelConsume(channel, tag)

参数

channel:表示Channel的对象

tag:消费者标识,字符串标量

详情

取消consume

例子

rabbitmq::cancelConsume(ch, 'consumer1')

后台订阅相关接口

createSubJob

语法

createSubJob(queue, handle, parser, tag, hosts, ports, [username='guest'], [password='guest'], [vhost="/"], [flags])

参数

queue:订阅的队列名称,字符串标量

handle:处理订阅数据的handle,能接受一个参数的函数

parser:解析数据的字符串或函数(同consume)

tag:消费者标识,字符串标量

hosts:主机,字符串向量,为集群中每一台主机的IP地址

ports:端口号,INT类型向量,为集群中每一台主机的端口号,注意,端口号的顺序要与hosts中的IP地址匹配。

username:用户名,字符串标量,可为空,默认为guest

password:密码,字符串标量,可为空,默认为guest

vhost:虚拟主机,字符串标量,可为空,默认为"/"

flags:字符串向量,可为空。支持的取值:nolocal,noack,exclusive

详情

后台订阅

例子

def handle(msg){
}
rabbitmq::createSubJob('test-queue2', handle, 'bytestream', 'consumer1',['localhost'], [5672], 'guest', 'guest')

getSubJobStat

语法

getSubJobStat()

参数

详情

获取所有后台订阅信息(订阅id,queue,tag,订阅时间戳)

例子

tb = rabbitmq::getSubJobStat()
print(tb)

cancelSubJob

语法

cancelSubJob(订阅id)

详情

取消某一个后台订阅

打/解包功能

createCSVFormatter

rabbitmq::createCSVFormatter([format], [delimiter=','], [rowDelimiter=';'])

创建一个CSV格式的Formatter函数。

参数:

  • 'format': 是一个STRING类型的向量。
  • 'delimiter': 是列之间的分隔符,默认是','。
  • 'rowDelimiter': 是行之间的分隔符,默认是';'。

例子:

MyFormat = take("", 3)
MyFormat[1] = "0.000"
formatter = rabbitmq::createCSVFormatter(MyFormat,',',';')

createCSVParser

rabbitmq::createCSVParser(schema, [delimiter=','], [rowDelimiter=';'])

创建一个CSV格式的Parser函数。

参数:

  • 'schema': 是一个包含各列数据类型的向量。
  • 'delimiter': 是列之间的分隔符,默认是','。
  • 'rowDelimiter': 是行之间的分隔符,默认是';'。

例子:

parser = rabbitmq::createCSVParser([INT,STRING,INT], ',',';')

createProtoBuformatter

rabbitmq::createProtoBuformatter()

创建一个protobuf格式的Formatter函数。

参数:无。

例子:

formatter = rabbitmq::createProtoBuformatter()

createProtoBufParser

rabbitmq::createProtoBufParser()

创建一个JSON格式的Parser函数。

参数:无

例子:

parser = rabbitmq::createProtoBufParser()

createJSONFormatter

rabbitmq::createJSONFormatter()

创建一个JSON格式的Formatter函数。

参数:无。

例子:

formatter = rabbitmq::createJSONFormatter()

createJSONParser

rabbitmq::createJSONParser(schema, colNames)

创建一个JSON格式的Parser函数。

参数:

  • 'schema' 是一个向量,表示各列的数据类型。
  • 'colNames' 是一个向量,表示列名。

例子:

parser = rabbitmq::createJSONParser([INT,TIMESTAMP,INT], ["id","ts","volume"])

close

rabbitmq::close(handle)

关闭一个connection或者一个channel,注意,如果关闭connection,其上的channel也会自动关闭

参数:

  • 'handle' 是一个handle, 是由connection接口或者channel接口返回的

例子:

conn = rabbitmq::connection('localhost', 5672, 'guest', 'guest', "/")
rabbitmq::close(conn)

closeAll

rabbitmq::closeAll()

关闭所有已经建立的connection和channel。

例子:

rabbitmq::closeAll()

getConnections

语法

rabbitmq::getConnections()

详情

获取创所有已创建的 RabbitMQ 连接信息。

返回一个元组,每个元素都是一个字典,其键值如下:

  • connection:连接句柄。
  • createTime:创建连接的时间,类型为 TIMESTAMP。

getChannels

语法

rabbitmq::getChannels([connection])

详情

如果填写了 connection 参数,则查询通过该 connection 所创建的通道信息;否则则查询所有已创建的通道信息。

返回一个元组,每个元素都是一个字典,其键值如下:

  • connection:通道句柄所对应的连接句柄。
  • channel:通道句柄。
  • createTime:表示创建通道的时间,类型为 TIMESTAMP。

参数

connection 连接句柄。

完整例子

GUI1

// 加载插件
loadPlugin("/home/yxu/Desktop/DolphinDBPlugin/rabbitmq/build/PluginRabbitMQ.txt")
// 建立连接
conn = rabbitmq::connection('localhost', 5672, 'guest', 'guest', '');
// 创建通道
ch = rabbitmq::channel(conn)
// 声明Exchange
rabbitmq::declareExchange(ch, 'test-exchange', 'fanout', ['durable'])
rabbitmq::declareExchange(ch, 'test-exchange1', 'fanout', ['durable'])
rabbitmq::declareExchange(ch, 'test-exchange2', 'fanout', ['durable'])
rabbitmq::declareExchange(ch, 'test-exchange3', 'fanout', ['durable'])
rabbitmq::declareExchange(ch, 'test-exchange4', 'fanout', ['durable'])
// 绑定Exchange
rabbitmq::bindExchange(ch, 'test-exchange1', 'test-exchange2', 'rule1')
// 解绑Exchange
rabbitmq::unbindExchange(ch, 'test-exchange1', 'test-exchange2', 'rule1')
// 移除Exchange
rabbitmq::removeExchange(ch, 'test-exchange')
rabbitmq::removeExchange(ch, 'test-exchange1')
rabbitmq::removeExchange(ch, 'test-exchange2')
rabbitmq::removeExchange(ch, 'test-exchange3')
rabbitmq::removeExchange(ch, 'test-exchange4')
// 声明Queue
rabbitmq::declareQueue(ch, 'test-queue1', ['durable'])
rabbitmq::declareQueue(ch, 'test-queue2', ['durable'])
rabbitmq::declareQueue(ch, 'test-queue3', ['durable'])
rabbitmq::declareQueue(ch, 'test-queue4', ['durable'])
// 绑定Queue
rabbitmq::bindQueue(ch, 'test-exchange1', 'test-queue1', 'rule1')
rabbitmq::bindQueue(ch, 'test-exchange2', 'test-queue2', 'rule1')
rabbitmq::bindQueue(ch, 'test-exchange3', 'test-queue3', 'rule1')
rabbitmq::bindQueue(ch, 'test-exchange4', 'test-queue4', 'rule1')
// 解绑Queue
rabbitmq::unbindQueue(ch, 'test-exchange1', 'test-queue', 'rule1')
// 移除Queue
rabbitmq::removeQueue(ch, 'test-queue1')
rabbitmq::removeQueue(ch, 'test-queue2')
rabbitmq::removeQueue(ch, 'test-queue3')
rabbitmq::removeQueue(ch, 'test-queue4')
// 发数据
rabbitmq::publish(ch, 'test-exchange1', 'rule1', 'Hello World1', 'bytestream')
rabbitmq::publish(ch, 'test-exchange2', 'rule1', 'Hello World1', 'bytestream')
// 取数据
def handle(msg){
}
rabbitmq::consume(ch, 'test-queue1', handle, 'bytestream', 'consumer1')

def handle(msg){
}
rabbitmq::consume(ch, 'test-queue2', handle, 'bytestream', 'consumer2')

// 取消取数据
rabbitmq::cancelConsume(ch, 'consumer1')
rabbitmq::cancelConsume(ch, 'consumer2')

// 发送json格式数据
formatter = rabbitmq::createJSONFormatter()
data = table(1..10 as id, take(now(), 10) as ts, rand(10, 10) as volume)
rabbitmq::publish(ch, 'test-exchange3', 'rule1', data, formatter)

// 发送csv格式数据
MyFormat = take("", 3)
MyFormat[1] = "0.000"
print(MyFormat)
data = table(1..10 as id, take(now(), 10) as ts, rand(10, 10) as volume)
formatter = rabbitmq::createCSVFormatter(MyFormat,',',';')
rabbitmq::publish(ch, 'test-exchange4', 'rule1', data, formatter)

// 接收json格式数据
parser = rabbitmq::createJSONParser([INT,TIMESTAMP,INT], ["id","ts","volume"])
def handle(msg){
}
rabbitmq::consume(ch, 'test-queue3', handle, parser, 'consumer3')

// 接收csv格式数据
parser1 = rabbitmq::createCSVParser([INT,STRING,INT], ',',';')
def handle(msg){
}
rabbitmq::consume(ch, 'test-queue4', handle, parser1, 'consumer4')

rabbitmq::cancelConsume(ch, 'consumer3')
rabbitmq::cancelConsume(ch, 'consumer4')

// 获取后台订阅信息
tb = rabbitmq::getSubJobStat()
print(tb)
// 取消某个后台订阅
rabbitmq::cancelSubJob(订阅id)

GUI2

// 订阅完成GUI2可关闭
def handle(msg){
}
rabbitmq::createSubJob('test-queue2', handle, 'bytestream', 'consumer1',['localhost'], [5672], 'guest', 'guest')

// 后台订阅json格式数据
parser = rabbitmq::createJSONParser([INT,TIMESTAMP,INT], ["id","ts","volume"])
def handle(msg){
}
rabbitmq::createSubJob('test-queue3', handle, parser, 'consumer1',['localhost'], [5672], 'guest', 'guest')

// 后台订阅csv格式数据
parser1 = rabbitmq::createCSVParser([INT,STRING,INT], ',',';')
def handle(msg){
}
rabbitmq::createSubJob('test-queue4', handle, parser1, 'consumer1',['localhost'], [5672], 'guest', 'guest')