RabbitMQ
在插件市场安装插件
版本要求
- DolphinDB Server: 2.00.10及更高版本
- OS: 64位 Linux
安装步骤
在DolphinDB 客户端中使用 listRemotePlugins 命令查看插件仓库中的插件信息。
login("admin", "123456") listRemotePlugins()
使用 installPlugin 命令完成插件安装。
installPlugin("rabbitmq")
使用
loadPlugin
命令加载插件。loadPlugin("rabbitmq")
用户接口
connection, connectionAMQP 在连接建立失败时会抛出异常
如果某个传入channel的函数调用失败,channel就会失效。所以推荐在调用不同函数时创建新的channel。但是不推荐每次publish时创建新的channel。
channel的成员函数除publish以及consume外,其他函数都是同步函数,如果执行失败,会抛出异常,这可能是因为参数不正确(IllegalArgumentException),可能是因为函数本身运行失败(RuntimeException),也可能是因为connection或channel失效(RuntimeException)。如果是因为后两个原因,此时需要重新创建channel。
在close(channel)之后调用channel的成员函数会抛出异常,提示“Please create channel first”
在close(conneciton)之后调用channel的成员函数会抛出异常,提示“failed to ..., refer to log for more information”,并且日志提示“Frame could not be sent”
在connection断开导致channel失效或channel的成员函数运行失败使channel失效之后再调用channel的成员函数会抛出异常,提示“failed to ..., refer to log for more information”,并且日志提示“Frame could not be sent”
创建channel的过程中如果因为connection已经被关闭或者连接数达到上限,会抛出异常
综上:channel失效之后需要创建新的channel,如果创建channel的过程中如果抛出异常,此时需要重新创建connection。
channel的成员函数publish如果失败会抛出异常。可以通过捕获异常来重新建立connection和channel并重新发送。
channel的成员函数consume本身是同步的,可以知道订阅是否成功。但是consume在后续调用处理消息的回调函数中可能会因为connection或channel失效而失败,此时没办法通知用户发生了错误。
一个channel上不能进行并发操作(也有可能是一个connection上)。如果需要多线程调用,应该创建每个线程的channel(也有可能是connection)。
connection
语法
connection(host, [port=5672], [username='guest'], [password='guest'], [vhost="/"])
参数
host
:主机,字符串标量
port
:端口号,数值类型,默认为5672
username
:用户名,字符串标量,默认为guest
password
:密码,字符串标量,默认为guest
vhost
:虚拟主机,字符串标量,默认为"/"
详情
使用给定信息连接RabbitMQ,返回表示连接的对象。
例子
conn = rabbitmq::connection('localhost', 5672, 'guest', 'guest', "/")
connectionAMQP
语法
connectionAMQP(amqpURIs)
参数
amqpURI
:AMQP协议地址,字符串向量
详情
使用AMQP地址连接RabbitMQ,返回表示连接的对象。
例子
conn = rabbitmq::connectionAMQP(['amqp://guest:guest@192.168.0.1:5672', 'amqp://guest:guest@192.168.0.2:5672', 'amqp://guest:guest@192.168.0.3:5672'])
channel
语法
channel(connection)
参数
connection
:RabbitMQ连接
详情
使用指定连接创建Channel,返回表示Channel的对象
例子
ch = rabbitmq::channel(conn)
declareExchange
语法
declareExchange(channel, name, [type='fanout'], [flags])
参数
channel
:表示Channel的对象
name
:字符串标量
type
:字符串标量,支持的取值:fanout
,direct
,topic
,headers
,consistent_hash
,默认为fanout
flags
:字符串向量,可为空。支持的取值:durable
,autodelete
,passive
,internal
详情
声明Exchange
例子
rabbitmq::declareExchange(ch, 'test-exchange', 'fanout', ['autodelete'])
bindExchange
语法
bindExchange(channel, source, target, routingkey)
参数
channel
:通道
source
:source Exchange名称
target
:target Exchange名称
routingkey
:路由规则名称
详情
绑定Exchange
例子
rabbitmq::bindExchange(ch, 'test-exchange1', 'test-exchange2', 'rule1')
unbindExchange
语法
bindExchange(channel, source, target, routingkey)
参数
同 bindExchange
详情
解绑Exchange
例子
rabbitmq::unbindExchange(ch, 'test-exchange1', 'test-exchange2', 'rule1')
removeExchange
语法
removeExchange(channel, name, [flags])
参数
channel
:通道
name
:Exchange名称
flags
:字符串向量,可为空。支持的取值:ifunused
详情
移除Exchange
例子
rabbitmq::removeExchange(ch, 'test-exchange')
declareQueue
语法
declareQueue(channel, name, [flags], [arguments])
参数
channel
:表示Channel的对象
name
:声明的队列名,字符串标量
flags
:队列标志,字符串向量,可为空。支持的取值:durable
,autodelete
,passive
,exclusive
arguments
:队列参数,字典,可为空。键为字符串,值支持的类型:STRING
,CHAR
,LONG
,INT
,BOOL
详情
声明Queue
例子
arguments = dict(STRING, ANY)
arguments['x-max-length'] = 10000
arguments['x-overflow'] = 'drop-head'
arguments['x-queue-type'] = 'quorum'
rabbitmq::declareQueue(ch, 'test', ['exclusive'], arguments)
bindQueue
语法
bindQueue(channel, exchange, queue, routingkey)
参数
channel
:通道
exchange
:source Exchange名称
queue
:target Queue名称
routingkey
:路由规则名称
详情
绑定Queue
例子
rabbitmq::bindQueue(ch, 'test-exchange1', 'test-queue1', 'rule1')
unbindQueue
语法
unbindQueue(channel, exchange, queue, routingkey)
参数
同bindQueue
详情
解绑Queue
例子
rabbitmq::unbindQueue(ch, 'test-exchange1', 'test-queue', 'rule1')
removeQueue
语法
removeQueue(channel, name, [flags])
参数
channel
:通道
name
:Queue名称
flags
:可选
详情
移除Queue
例子
rabbitmq::removeQueue(ch, 'test-queue1')
publish
语法
publish(channel, exchange, routingKey, message, format='default', [flags])
参数
channel
:表示Channel的对象
exchange
:字符串标量
routingKey
:字符串标量
message
:要发送的信息
format
:指定信息格式,字符串标量或函数(由createJSONFormatter或createCSVFormatter创建)
format | 取值 |
---|---|
default | DolphinDB默认格式 |
bytestream | 字节流 |
protobuf | protocol buffer |
flags
:字符串向量,可为空。支持的取值:mandatory
,immediate
详情
以指定方式发布DolphinDB对象
例子
rabbitmq::publish(ch, 'test-exchange1', 'rule1', 'Hello World1', 'bytestream')
consume
语法
consume(channel, queue, handle, parser, tag, [flags])
参数
channel
:表示Channel的对象
queue
:订阅的队列名称,字符串标量
handle
:一元函数,用于处理订阅的数据。请注意,传入的一元函数中不能存在对DFS表的操作,例如:读取或写入DFS表,获取DFS表的schema等。
parser
:订阅数据的格式,字符串标量,取值范围同publish
,或者是解析函数(由createJSONParser或createCSVParser创建)
tag
:消费者标识,字符串标量
flags
:字符串向量,可为空。支持的取值:nolocal
,noack
,exclusive
详情
订阅队列
例子
def handle(msg){
}
rabbitmq::consume(ch, 'test-queue1', handle, 'bytestream', 'consumer1')
cancelConsume
语法
cancelConsume(channel, tag)
参数
channel
:表示Channel的对象
tag
:消费者标识,字符串标量
详情
取消consume
例子
rabbitmq::cancelConsume(ch, 'consumer1')
后台订阅相关接口
createSubJob
语法
createSubJob(queue, handle, parser, tag, hosts, ports, [username='guest'], [password='guest'], [vhost="/"], [flags])
参数
queue
:订阅的队列名称,字符串标量
handle
:处理订阅数据的handle,能接受一个参数的函数
parser
:解析数据的字符串或函数(同consume)
tag
:消费者标识,字符串标量
hosts
:主机,字符串向量,为集群中每一台主机的IP地址
ports
:端口号,INT类型向量,为集群中每一台主机的端口号,注意,端口号的顺序要与hosts中的IP地址匹配。
username
:用户名,字符串标量,可为空,默认为guest
password
:密码,字符串标量,可为空,默认为guest
vhost
:虚拟主机,字符串标量,可为空,默认为"/"
flags
:字符串向量,可为空。支持的取值:nolocal,noack,exclusive
详情
后台订阅
例子
def handle(msg){
}
rabbitmq::createSubJob('test-queue2', handle, 'bytestream', 'consumer1',['localhost'], [5672], 'guest', 'guest')
getSubJobStat
语法
getSubJobStat()
参数
无
详情
获取所有后台订阅信息(订阅id,queue,tag,订阅时间戳)
例子
tb = rabbitmq::getSubJobStat()
print(tb)
cancelSubJob
语法
cancelSubJob(订阅id)
详情
取消某一个后台订阅
打/解包功能
createCSVFormatter
rabbitmq::createCSVFormatter([format], [delimiter=','], [rowDelimiter=';'])
创建一个CSV格式的Formatter函数。
参数:
- 'format': 是一个STRING类型的向量。
- 'delimiter': 是列之间的分隔符,默认是','。
- 'rowDelimiter': 是行之间的分隔符,默认是';'。
例子:
MyFormat = take("", 3)
MyFormat[1] = "0.000"
formatter = rabbitmq::createCSVFormatter(MyFormat,',',';')
createCSVParser
rabbitmq::createCSVParser(schema, [delimiter=','], [rowDelimiter=';'])
创建一个CSV格式的Parser函数。
参数:
- 'schema': 是一个包含各列数据类型的向量。
- 'delimiter': 是列之间的分隔符,默认是','。
- 'rowDelimiter': 是行之间的分隔符,默认是';'。
例子:
parser = rabbitmq::createCSVParser([INT,STRING,INT], ',',';')
createProtoBuformatter
rabbitmq::createProtoBuformatter()
创建一个protobuf格式的Formatter函数。
参数:无。
例子:
formatter = rabbitmq::createProtoBuformatter()
createProtoBufParser
rabbitmq::createProtoBufParser()
创建一个JSON格式的Parser函数。
参数:无
例子:
parser = rabbitmq::createProtoBufParser()
createJSONFormatter
rabbitmq::createJSONFormatter()
创建一个JSON格式的Formatter函数。
参数:无。
例子:
formatter = rabbitmq::createJSONFormatter()
createJSONParser
rabbitmq::createJSONParser(schema, colNames)
创建一个JSON格式的Parser函数。
参数:
- 'schema' 是一个向量,表示各列的数据类型。
- 'colNames' 是一个向量,表示列名。
例子:
parser = rabbitmq::createJSONParser([INT,TIMESTAMP,INT], ["id","ts","volume"])
close
rabbitmq::close(handle)
关闭一个connection或者一个channel,注意,如果关闭connection,其上的channel也会自动关闭
参数:
- 'handle' 是一个handle, 是由connection接口或者channel接口返回的
例子:
conn = rabbitmq::connection('localhost', 5672, 'guest', 'guest', "/")
rabbitmq::close(conn)
closeAll
rabbitmq::closeAll()
关闭所有已经建立的connection和channel。
例子:
rabbitmq::closeAll()
getConnections
语法
rabbitmq::getConnections()
详情
获取创所有已创建的 RabbitMQ 连接信息。
返回一个元组,每个元素都是一个字典,其键值如下:
- connection:连接句柄。
- createTime:创建连接的时间,类型为 TIMESTAMP。
getChannels
语法
rabbitmq::getChannels([connection])
详情
如果填写了 connection 参数,则查询通过该 connection 所创建的通道信息;否则则查询所有已创建的通道信息。
返回一个元组,每个元素都是一个字典,其键值如下:
- connection:通道句柄所对应的连接句柄。
- channel:通道句柄。
- createTime:表示创建通道的时间,类型为 TIMESTAMP。
参数
connection 连接句柄。
完整例子
GUI1
// 加载插件
loadPlugin("/home/yxu/Desktop/DolphinDBPlugin/rabbitmq/build/PluginRabbitMQ.txt")
// 建立连接
conn = rabbitmq::connection('localhost', 5672, 'guest', 'guest', '');
// 创建通道
ch = rabbitmq::channel(conn)
// 声明Exchange
rabbitmq::declareExchange(ch, 'test-exchange', 'fanout', ['durable'])
rabbitmq::declareExchange(ch, 'test-exchange1', 'fanout', ['durable'])
rabbitmq::declareExchange(ch, 'test-exchange2', 'fanout', ['durable'])
rabbitmq::declareExchange(ch, 'test-exchange3', 'fanout', ['durable'])
rabbitmq::declareExchange(ch, 'test-exchange4', 'fanout', ['durable'])
// 绑定Exchange
rabbitmq::bindExchange(ch, 'test-exchange1', 'test-exchange2', 'rule1')
// 解绑Exchange
rabbitmq::unbindExchange(ch, 'test-exchange1', 'test-exchange2', 'rule1')
// 移除Exchange
rabbitmq::removeExchange(ch, 'test-exchange')
rabbitmq::removeExchange(ch, 'test-exchange1')
rabbitmq::removeExchange(ch, 'test-exchange2')
rabbitmq::removeExchange(ch, 'test-exchange3')
rabbitmq::removeExchange(ch, 'test-exchange4')
// 声明Queue
rabbitmq::declareQueue(ch, 'test-queue1', ['durable'])
rabbitmq::declareQueue(ch, 'test-queue2', ['durable'])
rabbitmq::declareQueue(ch, 'test-queue3', ['durable'])
rabbitmq::declareQueue(ch, 'test-queue4', ['durable'])
// 绑定Queue
rabbitmq::bindQueue(ch, 'test-exchange1', 'test-queue1', 'rule1')
rabbitmq::bindQueue(ch, 'test-exchange2', 'test-queue2', 'rule1')
rabbitmq::bindQueue(ch, 'test-exchange3', 'test-queue3', 'rule1')
rabbitmq::bindQueue(ch, 'test-exchange4', 'test-queue4', 'rule1')
// 解绑Queue
rabbitmq::unbindQueue(ch, 'test-exchange1', 'test-queue', 'rule1')
// 移除Queue
rabbitmq::removeQueue(ch, 'test-queue1')
rabbitmq::removeQueue(ch, 'test-queue2')
rabbitmq::removeQueue(ch, 'test-queue3')
rabbitmq::removeQueue(ch, 'test-queue4')
// 发数据
rabbitmq::publish(ch, 'test-exchange1', 'rule1', 'Hello World1', 'bytestream')
rabbitmq::publish(ch, 'test-exchange2', 'rule1', 'Hello World1', 'bytestream')
// 取数据
def handle(msg){
}
rabbitmq::consume(ch, 'test-queue1', handle, 'bytestream', 'consumer1')
def handle(msg){
}
rabbitmq::consume(ch, 'test-queue2', handle, 'bytestream', 'consumer2')
// 取消取数据
rabbitmq::cancelConsume(ch, 'consumer1')
rabbitmq::cancelConsume(ch, 'consumer2')
// 发送json格式数据
formatter = rabbitmq::createJSONFormatter()
data = table(1..10 as id, take(now(), 10) as ts, rand(10, 10) as volume)
rabbitmq::publish(ch, 'test-exchange3', 'rule1', data, formatter)
// 发送csv格式数据
MyFormat = take("", 3)
MyFormat[1] = "0.000"
print(MyFormat)
data = table(1..10 as id, take(now(), 10) as ts, rand(10, 10) as volume)
formatter = rabbitmq::createCSVFormatter(MyFormat,',',';')
rabbitmq::publish(ch, 'test-exchange4', 'rule1', data, formatter)
// 接收json格式数据
parser = rabbitmq::createJSONParser([INT,TIMESTAMP,INT], ["id","ts","volume"])
def handle(msg){
}
rabbitmq::consume(ch, 'test-queue3', handle, parser, 'consumer3')
// 接收csv格式数据
parser1 = rabbitmq::createCSVParser([INT,STRING,INT], ',',';')
def handle(msg){
}
rabbitmq::consume(ch, 'test-queue4', handle, parser1, 'consumer4')
rabbitmq::cancelConsume(ch, 'consumer3')
rabbitmq::cancelConsume(ch, 'consumer4')
// 获取后台订阅信息
tb = rabbitmq::getSubJobStat()
print(tb)
// 取消某个后台订阅
rabbitmq::cancelSubJob(订阅id)
GUI2
// 订阅完成GUI2可关闭
def handle(msg){
}
rabbitmq::createSubJob('test-queue2', handle, 'bytestream', 'consumer1',['localhost'], [5672], 'guest', 'guest')
// 后台订阅json格式数据
parser = rabbitmq::createJSONParser([INT,TIMESTAMP,INT], ["id","ts","volume"])
def handle(msg){
}
rabbitmq::createSubJob('test-queue3', handle, parser, 'consumer1',['localhost'], [5672], 'guest', 'guest')
// 后台订阅csv格式数据
parser1 = rabbitmq::createCSVParser([INT,STRING,INT], ',',';')
def handle(msg){
}
rabbitmq::createSubJob('test-queue4', handle, parser1, 'consumer1',['localhost'], [5672], 'guest', 'guest')