RabbitMQ
Installation (with installPlugin
)
Required server version: DolphinDB 2.00.10 or higher
OS: Linux x64
Installation Steps:
(1) Use listRemotePlugins to check plugin information in the plugin repository.
Note: For plugins not included in the provided list, you can install through precompiled binaries or compile from source. These files can be accessed from our GitHub repository by switching to the appropriate version branch.
login("admin", "123456")
listRemotePlugins()
(2) Invoke installPlugin for plugin installation.
installPlugin("rabbitmq")
(3) Use loadPlugin to load the plugin before using the plugin methods.
loadPlugin("rabbitmq")
Method References
Basic Methods
connection
Syntax
connection(host, [port=5672], [username='guest'], [password='guest'], [vhost="/"])
Parameters
- host: A STRING scalar indicating the IP address of the host.
- port (optional): An INT scalar indicating the port number of the host. The default value is 5672.
- username (optional): A STRING scalar indicating the username. The default value is 'guest'.
- password (optional): A STRING scalar indicating the password. The default value is 'guest'.
- vhost (optional): A STRING scalar indicating the virtual host. The default value is "/".
Details
Connect to RabbitMQ using the specified information and return a connection object.
Examples
conn = rabbitmq::connection('localhost', 5672, 'guest', 'guest', "/")
connectionAMQP
Syntax
connectionAMQP(amqpURI)
Parameters
- amqpURI: A STRING vector indicating the AMQP URI(s).
Details
Connect to RabbitMQ using the AMQP URI(s) and return a connection object.
Examples
conn = rabbitmq::connectionAMQP(['amqp://guest:guest@192.168.0.1:5672', 'amqp://guest:guest@192.168.0.2:5672', 'amqp://guest:guest@192.168.0.3:5672'])
channel
Syntax
channel(connection)
Parameters
- connection: A RabbitMQ connection object.
Details
Create a channel using the specified connection object and return a channel object.
Examples
ch = rabbitmq::channel(conn)
declareExchange
Syntax
declareExchange(channel, name, [type='fanout'], [flags])
Parameters
- channel: A channel object.
- name: A STRING scalar indicating the name of the exchange.
- type (optional): A STRING scalar indicating the type of the exchange. It can be 'fanout' (default), 'direct', 'topic', 'headers', or 'consistent_hash'.
- flags (optional): A STRING vector indicating the flags of the exchange. It can be 'durable', 'autodelete', 'passive', or 'internal'.
Details
Declare an exchange.
Examples
rabbitmq::declareExchange(ch, 'test-exchange', 'fanout', ['autodelete'])
bindExchange
Syntax
bindExchange(channel, source, target, routingkey)
Parameters
- channel: A channel object.
- source: The name of the source exchange to be bound.
- target: The name of the target exchange to bind to.
- routingkey: The routing key binding the two exchanges.
Details
Bind an exchange to another exchange.
Examples
rabbitmq::bindExchange(ch, 'test-exchange1', 'test-exchange2', 'rule1')
unbindExchange
Syntax
connection(host, [port=5672], [username='guest'], [password='guest'], [vhost="/"])
Parameters
Same as those of the bindExchange
method.
Details
Remove a binding from an exchange to another exchange.
Examples
rabbitmq::unbindExchange(ch, 'test-exchange1', 'test-exchange2', 'rule1')
removeExchange
Syntax
removeExchange(channel, name, [flags])
Parameters
- channel: A channel object.
- name: A STRING scalar indicating the name of the exchange.
- flags (optional): A STRING vector indicating the flags of the exchange. It can be 'ifunused'.
Details
Remove an exchange.
Examples
rabbitmq::removeExchange(ch, 'test-exchange')
declareQueue
Syntax
declareQueue(channel, name, [flags], [arguments])
Parameters
- channel: A channel object.
- name: A STRING scalar indicating the name of the queue.
- flags (optional): A STRING vector indicating the flags of the queue. It can be 'durable', 'autodelete', 'passive', or 'exclusive'.
- arguments (optional): A dictionary of STRING keys and STRING/CHAR/LONG/INT/BOOL values, indicating the arguments in the queue.
Details
Declare a queue.
Examples
arguments = dict(STRING, ANY)
arguments['x-max-length'] = 10000
arguments['x-overflow'] = 'drop-head'
arguments['x-queue-type'] = 'quorum'
rabbitmq::declareQueue(ch, 'test', ['exclusive'], arguments)
bindQueue
Syntax
bindQueue(channel, exchange, queue, routingkey)
Parameters
- channel: A channel object.
- exchange: The name of the exchange to be bound.
- queue: The name of the queue to bind to.
- routingkey: The routing key binding the exchange and the queue.
Details
Bind an exchange to a queue.
Examples
rabbitmq::bindQueue(ch, 'test-exchange1', 'test-queue1', 'rule1')
unbindQueue
Syntax
unbindQueue(channel, exchange, queue, routingkey)
Parameters
Same as those of the bindQueue
method.
Details
Remove a binding from an exchange to a queue.
Examples
rabbitmq::unbindQueue(ch, 'test-exchange1', 'test-queue', 'rule1')
removeQueue
Syntax
removeQueue(channel, name, [flags])
Parameters
- channel: A channel object.
- name: A STRING scalar indicating the name of the queue.
- flags (optional): A STRING vector indicating the flags of the queue. It can be ‘ifunused' or 'ifempty'.
Details
Remove a queue.
Examples
rabbitmq::removeQueue(ch, 'test-queue1')
publish
Syntax
publish(channel, exchange, routingKey, message, format='default', [flags])
Parameters
- channel: A channel object.
- exchange: The name of the exchange to publish to.
- routingkey: The routing key specifying which queue to publish to.
- message: The message to be published.
- format: Used to package the subscribed data into specified
format. It can be:
- A function created by method
createJsonFormatter
orcreateCsvFormatter
. - A STRING scalar that can be:
- default: DolphinDB’s default format
- bytestream: byte stream format
- protobuf: protocol buffer format
- A function created by method
- flags (optional): A STRING vector indicating the flags of the method. It can be 'mandatory' or 'immediate'.
Details
Publish a message from DolphinDB to an exchange in the specified format.
Examples
rabbitmq::publish(ch, 'test-exchange1', 'rule1', 'Hello World1', 'bytestream')
consume
Syntax
consume(channel, queue, handle, parser, tag, [flags], [cancelCallback])
Parameters
- channel: A channel object.
- queue: The name of the subscribed queue.
- handle: A unary function used to process the subscribed data. Note that the function cannot operate on DFS tables, such as reading, writing, or schema retrieval.
- parser: Used to parse the subscribed data into specified format.
It can be:
- A function created by methods
createJSONParser
orcreateCSVParser
. - A STRING scalar that can be:
- default: DolphinDB’s default format
- bytestream: byte stream format
- protobuf: protocol buffer format
- A function created by methods
- tag: A STRING scalar indicating the consumer tag.
- flags (optional): A STRING vector indicating the flags of the method. It can be 'nolocal', 'noack', or 'exclusive'.
- cancelCallback (optional): A user-defined unary function, which takes the parameter tag as its input. This function is called when a subscription is canceled due to RabbitMQ server issues, such as the subscribed queue being deleted or the node storing the queue being terminated.
Details
Subscribe to a queue.
Examples
def handleMsg(msg){
// handle received message
}
def cancelNotify(tag){
writeLog(tag)
}
rabbitmq::consume(ch, 'test-queue1', handleMsg, 'bytestream', 'consumer1', ,cancelNotify)
cancelConsume
Syntax
cancelConsume(channel, tag)
Parameters
- channel: A channel object.
- tag: A STRING scalar indicating the consumer tag.
Details
Cancel the subscription to a queue.
Examples
rabbitmq::cancelConsume(ch, 'consumer1')
Background Subscription
createSubJob
Syntax
createSubJob(queue, handle, parser, tag, hosts, ports, [username='guest'], [password='guest'], [vhost="/"], [flags])
Parameters
- queue: The name of the subscribed queue.
- handle: A unary function used to process the subscribed data.
- parser: A STRING scalar or a function used to parse the
subscribed data into specified format (same as method
consume
). - tag: A STRING scalar indicating the consumer tag.
- hosts: A STRING vector indicating the IP address of each host in the cluster.
- ports: An INT vector indicating the port number of each host in the cluster, corresponding to the elements in parameter hosts.
- username (optional): A STRING scalar indicating the username. The default value is 'guest'.
- password (optional): A STRING scalar indicating the password. The default value is 'guest'.
- vhost (optional): A STRING scalar indicating the virtual host. The default value is "/".
- flags (optional): A STRING vector indicating the flags of the method. It can be 'nolocal', 'noack', or 'exclusive'.
- cancelCallback (optional): A user-defined unary function, which takes the parameter tag as its input. This function is called when a subscription is canceled due to RabbitMQ server issues, such as the subscribed queue being deleted or the node storing the queue being terminated.
Details
Subscribe to a queue in the background.
Examples
def handle(msg){
}
rabbitmq::createSubJob('test-queue2', handle, 'bytestream', 'consumer1',['localhost'], [5672], 'guest', 'guest')
getSubJobStat
Syntax
getSubJobStat()
Parameter
None
Details
Get the information on all subscriptions in the background and return the following columns:
- subJobId: subscription ID
- subQueue: name of the subscribed queue
- tag: consumer tag
- createTimestamp: creation timestamp of the subscription
- errMsg: error message thrown when the subscription is canceled
Examples
tb = rabbitmq::getSubJobStat()
print(tb)
cancelSubJob
Syntax
cancelSubJob(subscription)
Parameter
- subscription: The ID of the subscription to be canceled.
Details
Cancel a subscription in the background.
Formatter/Parser
createCSVFormatter
Syntax
createCSVFormatter([format], [delimiter=','], [rowDelimiter=';'])
Parameter
- format (optional): A STRING vector.
- delimiter (optional): The separator between columns, and the default is ','.
- rowDelimiter (optional): The separator between the lines, and the default is ';'.
Details
Create a formatter function in CSV format.
Examples
MyFormat = take("", 3)
MyFormat[1] = "0.000"
formatter = rabbitmq::createCSVFormatter(MyFormat,',',';')
createCSVParser
Syntax
createCSVParser(schema, [delimiter=','], [rowDelimiter=';'])
Parameter
- schema: A vector indicating the data types of columns.
- delimiter (optional): The separator between columns, and the default is ','.
- rowDelimiter (optional): The separator between the lines, and the default is ';'.
Details
Create a parser function in CSV format.
Examples
parser = rabbitmq::createCSVParser([INT,STRING,INT], ',',';')
createProtoBuformatter
Syntax
createProtoBuformatter()
Parameter
None
Details
Create a formatter function in protobuf format.
Examples
formatter = rabbitmq::createProtoBuformatter()
createProtoBufParser
Syntax
createProtoBufParser()
Parameter
None
Details
Create a parser function in protobuf format.
Examples
parser = rabbitmq::createProtoBufParser()
createJSONFormatter
Syntax
createJSONFormatter()
Parameter
None
Details
Create a formatter function in JSON format.
Examples
formatter = rabbitmq::createJSONFormatter()
createJSONParser
Syntax
createJSONParser(schema, colNames)
Parameter
- schema: A vector indicating the data types of columns.
- colNames: A vector indicating the column names.
Details
Create a parser function in JSON format.
Examples
parser = rabbitmq::createJSONParser([INT,TIMESTAMP,INT], ["id","ts","volume"])
close
Syntax
close(handle)
Parameter
- handle: A handle returned by method
connection
orchannel
.
Details
Close a connection to RabbitMQ or a channel. Note that closing a connection closes all channels on the connection.
Examples
conn = rabbitmq::connection('localhost', 5672, 'guest', 'guest', "/")
rabbitmq::close(conn)
closeAll
Syntax
closeAll()
Parameter
None
Details
Close all existing connections and channels.
Examples
rabbitmq::closeAll()
getConnections
Syntax
getConnections()
Parameter
None
Details
Get the information on all existing connections to RabbitMQ.
Return a tuple where each element is a dictionary with the following key-value pairs:
- connection: The connection handle.
- createTime: A TIMESTAMP scalar indicating the creation time of each connection.
getChannels
Syntax
getChannels([connection])
Parameter
- connection (optional): A connection object. If specified, the method gets the information about the channels on the specified connection. If unspecified, the method gets the information about all existing channels.
Details
Get the channel information.
Return a tuple where each element is a dictionary with the following key-value pairs:
- connection: The connection handle.
- channel: The channel handle.
- createTime: A TIMESTAMP scalar indicating the creation time of each channel.
Note
- Methods
connection
andconnectionAMQP
throw an exception upon a failed connection. - If a method passed to a channel fails to execute, the entire channel becomes invalid. It is recommended to create a new channel for each method call, but doing so for each publish operation is not recommended.
- Apart from methods
publish
andconsume
, all member methods of a channel are synchronous. If they fail to execute, an exception is thrown, which may result from incorrect arguments (IllegalArgumentException
), method execution failure (RuntimeException
), or an invalid connection or channel (RuntimeException
). In the latter two cases, a new channel should be created.- If a channel function is called after invoking
close(channel)
, an exception is thrown: "Please create channel first". - If a channel function is called after invoking
close(connection)
, an exception is thrown: "failed to ..., refer to log for more information," and the log indicates "Frame could not be sent". - If a channel method is called on an invalid channel due to a closed connection or method execution failure, an exception is thrown: "failed to ..., refer to log for more information," with the log indicating "Frame could not be sent".
- When creating a channel, if the corresponding connection is closed or the maximum number of connections is reached, an exception is thrown.
- In summary: After a channel becomes invalid, it is recommended to create a new one. If an exception occurs during channel creation, a new connection needs to be created.
- If a channel function is called after invoking
- If the
publish
method fails, an exception is thrown. You can catch this exception to recreate a connection and channel and republish the message. - The
consume
method is synchronous, allowing users to know immediately whether the subscription succeeds. However, if it fails during subsequent message processing due to an invalid connection or channel, users are not notified of the failure. - Concurrent operations are not allowed on a single channel (or possibly a single connection). For multi-threaded operations, a separate channel/connection is required for each thread.
Examples
GUI1
// Load plugin
loadPlugin("/home/yxu/Desktop/DolphinDBPlugin/rabbitmq/build/PluginRabbitMQ.txt")
// Connect to RabbitMQ
conn = rabbitmq::connection('localhost', 5672, 'guest', 'guest', '');
// Create a channel
ch = rabbitmq::channel(conn)
// Declare an exchange
rabbitmq::declareExchange(ch, 'test-exchange', 'fanout', ['durable'])
rabbitmq::declareExchange(ch, 'test-exchange1', 'fanout', ['durable'])
rabbitmq::declareExchange(ch, 'test-exchange2', 'fanout', ['durable'])
rabbitmq::declareExchange(ch, 'test-exchange3', 'fanout', ['durable'])
rabbitmq::declareExchange(ch, 'test-exchange4', 'fanout', ['durable'])
// Bind an exchange to another exchange
rabbitmq::bindExchange(ch, 'test-exchange1', 'test-exchange2', 'rule1')
// Remove a binding from an exchange to another exchange
rabbitmq::unbindExchange(ch, 'test-exchange1', 'test-exchange2', 'rule1')
// Remove an exchange
rabbitmq::removeExchange(ch, 'test-exchange')
rabbitmq::removeExchange(ch, 'test-exchange1')
rabbitmq::removeExchange(ch, 'test-exchange2')
rabbitmq::removeExchange(ch, 'test-exchange3')
rabbitmq::removeExchange(ch, 'test-exchange4')
// Declare a queue
rabbitmq::declareQueue(ch, 'test-queue1', ['durable'])
rabbitmq::declareQueue(ch, 'test-queue2', ['durable'])
rabbitmq::declareQueue(ch, 'test-queue3', ['durable'])
rabbitmq::declareQueue(ch, 'test-queue4', ['durable'])
// Bind an exchange to a queue
rabbitmq::bindQueue(ch, 'test-exchange1', 'test-queue1', 'rule1')
rabbitmq::bindQueue(ch, 'test-exchange2', 'test-queue2', 'rule1')
rabbitmq::bindQueue(ch, 'test-exchange3', 'test-queue3', 'rule1')
rabbitmq::bindQueue(ch, 'test-exchange4', 'test-queue4', 'rule1')
// Remove a binding from an exchange to a queue
rabbitmq::unbindQueue(ch, 'test-exchange1', 'test-queue', 'rule1')
// Remove a queue
rabbitmq::removeQueue(ch, 'test-queue1')
rabbitmq::removeQueue(ch, 'test-queue2')
rabbitmq::removeQueue(ch, 'test-queue3')
rabbitmq::removeQueue(ch, 'test-queue4')
// Publish data to an exchange
rabbitmq::publish(ch, 'test-exchange1', 'rule1', 'Hello World1', 'bytestream')
rabbitmq::publish(ch, 'test-exchange2', 'rule1', 'Hello World1', 'bytestream')
// Subscribe to a queue for data consumption
def handle(msg){
}
rabbitmq::consume(ch, 'test-queue1', handle, 'bytestream', 'consumer1')
def handle(msg){
}
rabbitmq::consume(ch, 'test-queue2', handle, 'bytestream', 'consumer2')
// Cancel the subscription to a queue
rabbitmq::cancelConsume(ch, 'consumer1')
rabbitmq::cancelConsume(ch, 'consumer2')
// Publish data in JSON format
formatter = rabbitmq::createJSONFormatter()
data = table(1..10 as id, take(now(), 10) as ts, rand(10, 10) as volume)
rabbitmq::publish(ch, 'test-exchange3', 'rule1', data, formatter)
// Publish data in CSV format
MyFormat = take("", 3)
MyFormat[1] = "0.000"
print(MyFormat)
data = table(1..10 as id, take(now(), 10) as ts, rand(10, 10) as volume)
formatter = rabbitmq::createCSVFormatter(MyFormat,',',';')
rabbitmq::publish(ch, 'test-exchange4', 'rule1', data, formatter)
// Consume data in JSON format
parser = rabbitmq::createJSONParser([INT,TIMESTAMP,INT], ["id","ts","volume"])
def handle(msg){
}
rabbitmq::consume(ch, 'test-queue3', handle, parser, 'consumer3')
// Consume data in CSV format
parser1 = rabbitmq::createCSVParser([INT,STRING,INT], ',',';')
def handle(msg){
}
rabbitmq::consume(ch, 'test-queue4', handle, parser1, 'consumer4')
rabbitmq::cancelConsume(ch, 'consumer3')
rabbitmq::cancelConsume(ch, 'consumer4')
// Get the information on all subscriptions in the background
tb = rabbitmq::getSubJobStat()
print(tb)
// Cancel a subscription in the background
rabbitmq::cancelSubJob(subscription)
GUI2
// After subscribing to test-queue2, GUI2 can be closed
def handle(msg){
}
rabbitmq::createSubJob('test-queue2', handle, 'bytestream', 'consumer1',['localhost'], [5672], 'guest', 'guest')
// Consume JSON-format data in the background
parser = rabbitmq::createJSONParser([INT,TIMESTAMP,INT], ["id","ts","volume"])
def handle(msg){
}
rabbitmq::createSubJob('test-queue3', handle, parser, 'consumer1',['localhost'], [5672], 'guest', 'guest')
// Consume CSV-format data in the background
parser1 = rabbitmq::createCSVParser([INT,STRING,INT], ',',';')
def handle(msg){
}
rabbitmq::createSubJob('test-queue4', handle, parser1, 'consumer1',['localhost'], [5672], 'guest', 'guest')