RabbitMQ

Installation (with installPlugin)

Required server version: DolphinDB 2.00.10 or higher

OS: Linux x64

Installation Steps:

(1) Use listRemotePlugins to check plugin information in the plugin repository.

Note: For plugins not included in the provided list, you can install through precompiled binaries or compile from source. These files can be accessed from our GitHub repository by switching to the appropriate version branch.

login("admin", "123456")
listRemotePlugins()

(2) Invoke installPlugin for plugin installation.

installPlugin("rabbitmq")

(3) Use loadPlugin to load the plugin before using the plugin methods.

loadPlugin("rabbitmq")

Method References

Basic Methods

connection

Syntax

connection(host, [port=5672], [username='guest'], [password='guest'], [vhost="/"])

Parameters

  • host: A STRING scalar indicating the IP address of the host.
  • port (optional): An INT scalar indicating the port number of the host. The default value is 5672.
  • username (optional): A STRING scalar indicating the username. The default value is 'guest'.
  • password (optional): A STRING scalar indicating the password. The default value is 'guest'.
  • vhost (optional): A STRING scalar indicating the virtual host. The default value is "/".

Details

Connect to RabbitMQ using the specified information and return a connection object.

Examples

conn = rabbitmq::connection('localhost', 5672, 'guest', 'guest', "/")

connectionAMQP

Syntax

connectionAMQP(amqpURI)

Parameters

  • amqpURI: A STRING vector indicating the AMQP URI(s).

Details

Connect to RabbitMQ using the AMQP URI(s) and return a connection object.

Examples

conn = rabbitmq::connectionAMQP(['amqp://guest:guest@192.168.0.1:5672', 'amqp://guest:guest@192.168.0.2:5672', 'amqp://guest:guest@192.168.0.3:5672'])

channel

Syntax

channel(connection)

Parameters

  • connection: A RabbitMQ connection object.

Details

Create a channel using the specified connection object and return a channel object.

Examples

ch = rabbitmq::channel(conn)

declareExchange

Syntax

declareExchange(channel, name, [type='fanout'], [flags])

Parameters

  • channel: A channel object.
  • name: A STRING scalar indicating the name of the exchange.
  • type (optional): A STRING scalar indicating the type of the exchange. It can be 'fanout' (default), 'direct', 'topic', 'headers', or 'consistent_hash'.
  • flags (optional): A STRING vector indicating the flags of the exchange. It can be 'durable', 'autodelete', 'passive', or 'internal'.

Details

Declare an exchange.

Examples

rabbitmq::declareExchange(ch, 'test-exchange', 'fanout', ['autodelete'])

bindExchange

Syntax

bindExchange(channel, source, target, routingkey)

Parameters

  • channel: A channel object.
  • source: The name of the source exchange to be bound.
  • target: The name of the target exchange to bind to.
  • routingkey: The routing key binding the two exchanges.

Details

Bind an exchange to another exchange.

Examples

rabbitmq::bindExchange(ch, 'test-exchange1', 'test-exchange2', 'rule1')

unbindExchange

Syntax

connection(host, [port=5672], [username='guest'], [password='guest'], [vhost="/"])

Parameters

Same as those of the bindExchange method.

Details

Remove a binding from an exchange to another exchange.

Examples

rabbitmq::unbindExchange(ch, 'test-exchange1', 'test-exchange2', 'rule1')

removeExchange

Syntax

removeExchange(channel, name, [flags])

Parameters

  • channel: A channel object.
  • name: A STRING scalar indicating the name of the exchange.
  • flags (optional): A STRING vector indicating the flags of the exchange. It can be 'ifunused'.

Details

Remove an exchange.

Examples

rabbitmq::removeExchange(ch, 'test-exchange')

declareQueue

Syntax

declareQueue(channel, name, [flags], [arguments])

Parameters

  • channel: A channel object.
  • name: A STRING scalar indicating the name of the queue.
  • flags (optional): A STRING vector indicating the flags of the queue. It can be 'durable', 'autodelete', 'passive', or 'exclusive'.
  • arguments (optional): A dictionary of STRING keys and STRING/CHAR/LONG/INT/BOOL values, indicating the arguments in the queue.

Details

Declare a queue.

Examples

arguments = dict(STRING, ANY)
arguments['x-max-length'] = 10000
arguments['x-overflow'] = 'drop-head'
arguments['x-queue-type'] = 'quorum'
rabbitmq::declareQueue(ch, 'test', ['exclusive'], arguments)

bindQueue

Syntax

bindQueue(channel, exchange, queue, routingkey)

Parameters

  • channel: A channel object.
  • exchange: The name of the exchange to be bound.
  • queue: The name of the queue to bind to.
  • routingkey: The routing key binding the exchange and the queue.

Details

Bind an exchange to a queue.

Examples

rabbitmq::bindQueue(ch, 'test-exchange1', 'test-queue1', 'rule1')

unbindQueue

Syntax

unbindQueue(channel, exchange, queue, routingkey)

Parameters

Same as those of the bindQueue method.

Details

Remove a binding from an exchange to a queue.

Examples

rabbitmq::unbindQueue(ch, 'test-exchange1', 'test-queue', 'rule1')

removeQueue

Syntax

removeQueue(channel, name, [flags])

Parameters

  • channel: A channel object.
  • name: A STRING scalar indicating the name of the queue.
  • flags (optional): A STRING vector indicating the flags of the queue. It can be ‘ifunused' or 'ifempty'.

Details

Remove a queue.

Examples

rabbitmq::removeQueue(ch, 'test-queue1')

publish

Syntax

publish(channel, exchange, routingKey, message, format='default', [flags])

Parameters

  • channel: A channel object.
  • exchange: The name of the exchange to publish to.
  • routingkey: The routing key specifying which queue to publish to.
  • message: The message to be published.
  • format: Used to package the subscribed data into specified format. It can be:
    • A function created by method createJsonFormatter or createCsvFormatter.
    • A STRING scalar that can be:
      • default: DolphinDB’s default format
      • bytestream: byte stream format
      • protobuf: protocol buffer format
  • flags (optional): A STRING vector indicating the flags of the method. It can be 'mandatory' or 'immediate'.

Details

Publish a message from DolphinDB to an exchange in the specified format.

Examples

rabbitmq::publish(ch, 'test-exchange1', 'rule1', 'Hello World1', 'bytestream')

consume

Syntax

consume(channel, queue, handle, parser, tag, [flags], [cancelCallback])

Parameters

  • channel: A channel object.
  • queue: The name of the subscribed queue.
  • handle: A unary function used to process the subscribed data. Note that the function cannot operate on DFS tables, such as reading, writing, or schema retrieval.
  • parser: Used to parse the subscribed data into specified format. It can be:
    • A function created by methodscreateJSONParser or createCSVParser.
    • A STRING scalar that can be:
      • default: DolphinDB’s default format
      • bytestream: byte stream format
      • protobuf: protocol buffer format
  • tag: A STRING scalar indicating the consumer tag.
  • flags (optional): A STRING vector indicating the flags of the method. It can be 'nolocal', 'noack', or 'exclusive'.
  • cancelCallback (optional): A user-defined unary function, which takes the parameter tag as its input. This function is called when a subscription is canceled due to RabbitMQ server issues, such as the subscribed queue being deleted or the node storing the queue being terminated.

Details

Subscribe to a queue.

Examples

def handleMsg(msg){
	// handle received message
}
def cancelNotify(tag){
	writeLog(tag)
}
rabbitmq::consume(ch, 'test-queue1', handleMsg, 'bytestream', 'consumer1', ,cancelNotify)

cancelConsume

Syntax

cancelConsume(channel, tag)

Parameters

  • channel: A channel object.
  • tag: A STRING scalar indicating the consumer tag.

Details

Cancel the subscription to a queue.

Examples

rabbitmq::cancelConsume(ch, 'consumer1')

Background Subscription

createSubJob

Syntax

createSubJob(queue, handle, parser, tag, hosts, ports, [username='guest'], [password='guest'], [vhost="/"], [flags])

Parameters

  • queue: The name of the subscribed queue.
  • handle: A unary function used to process the subscribed data.
  • parser: A STRING scalar or a function used to parse the subscribed data into specified format (same as method consume).
  • tag: A STRING scalar indicating the consumer tag.
  • hosts: A STRING vector indicating the IP address of each host in the cluster.
  • ports: An INT vector indicating the port number of each host in the cluster, corresponding to the elements in parameter hosts.
  • username (optional): A STRING scalar indicating the username. The default value is 'guest'.
  • password (optional): A STRING scalar indicating the password. The default value is 'guest'.
  • vhost (optional): A STRING scalar indicating the virtual host. The default value is "/".
  • flags (optional): A STRING vector indicating the flags of the method. It can be 'nolocal', 'noack', or 'exclusive'.
  • cancelCallback (optional): A user-defined unary function, which takes the parameter tag as its input. This function is called when a subscription is canceled due to RabbitMQ server issues, such as the subscribed queue being deleted or the node storing the queue being terminated.

Details

Subscribe to a queue in the background.

Examples

def handle(msg){
}
rabbitmq::createSubJob('test-queue2', handle, 'bytestream', 'consumer1',['localhost'], [5672], 'guest', 'guest')

getSubJobStat

Syntax

getSubJobStat()

Parameter

None

Details

Get the information on all subscriptions in the background and return the following columns:

  • subJobId: subscription ID
  • subQueue: name of the subscribed queue
  • tag: consumer tag
  • createTimestamp: creation timestamp of the subscription
  • errMsg: error message thrown when the subscription is canceled

Examples

tb = rabbitmq::getSubJobStat()
print(tb)

cancelSubJob

Syntax

cancelSubJob(subscription)

Parameter

  • subscription: The ID of the subscription to be canceled.

Details

Cancel a subscription in the background.

Formatter/Parser

createCSVFormatter

Syntax

createCSVFormatter([format], [delimiter=','], [rowDelimiter=';'])

Parameter

  • format (optional): A STRING vector.
  • delimiter (optional): The separator between columns, and the default is ','.
  • rowDelimiter (optional): The separator between the lines, and the default is ';'.

Details

Create a formatter function in CSV format.

Examples

MyFormat = take("", 3)
MyFormat[1] = "0.000"
formatter = rabbitmq::createCSVFormatter(MyFormat,',',';')

createCSVParser

Syntax

createCSVParser(schema, [delimiter=','], [rowDelimiter=';'])

Parameter

  • schema: A vector indicating the data types of columns.
  • delimiter (optional): The separator between columns, and the default is ','.
  • rowDelimiter (optional): The separator between the lines, and the default is ';'.

Details

Create a parser function in CSV format.

Examples

parser = rabbitmq::createCSVParser([INT,STRING,INT], ',',';')

createProtoBuformatter

Syntax

createProtoBuformatter()

Parameter

None

Details

Create a formatter function in protobuf format.

Examples

formatter = rabbitmq::createProtoBuformatter()

createProtoBufParser

Syntax

createProtoBufParser()

Parameter

None

Details

Create a parser function in protobuf format.

Examples

parser = rabbitmq::createProtoBufParser()

createJSONFormatter

Syntax

createJSONFormatter()

Parameter

None

Details

Create a formatter function in JSON format.

Examples

formatter = rabbitmq::createJSONFormatter()

createJSONParser

Syntax

createJSONParser(schema, colNames)

Parameter

  • schema: A vector indicating the data types of columns.
  • colNames: A vector indicating the column names.

Details

Create a parser function in JSON format.

Examples

parser = rabbitmq::createJSONParser([INT,TIMESTAMP,INT], ["id","ts","volume"])

close

Syntax

close(handle)

Parameter

  • handle: A handle returned by method connection or channel.

Details

Close a connection to RabbitMQ or a channel. Note that closing a connection closes all channels on the connection.

Examples

conn = rabbitmq::connection('localhost', 5672, 'guest', 'guest', "/")
rabbitmq::close(conn)

closeAll

Syntax

closeAll()

Parameter

None

Details

Close all existing connections and channels.

Examples

rabbitmq::closeAll()

getConnections

Syntax

getConnections()

Parameter

None

Details

Get the information on all existing connections to RabbitMQ.

Return a tuple where each element is a dictionary with the following key-value pairs:

  • connection: The connection handle.
  • createTime: A TIMESTAMP scalar indicating the creation time of each connection.

getChannels

Syntax

getChannels([connection])

Parameter

  • connection (optional): A connection object. If specified, the method gets the information about the channels on the specified connection. If unspecified, the method gets the information about all existing channels.

Details

Get the channel information.

Return a tuple where each element is a dictionary with the following key-value pairs:

  • connection: The connection handle.
  • channel: The channel handle.
  • createTime: A TIMESTAMP scalar indicating the creation time of each channel.

Note

  • Methods connection and connectionAMQP throw an exception upon a failed connection.
  • If a method passed to a channel fails to execute, the entire channel becomes invalid. It is recommended to create a new channel for each method call, but doing so for each publish operation is not recommended.
  • Apart from methods publish and consume, all member methods of a channel are synchronous. If they fail to execute, an exception is thrown, which may result from incorrect arguments (IllegalArgumentException), method execution failure (RuntimeException), or an invalid connection or channel (RuntimeException). In the latter two cases, a new channel should be created.
    • If a channel function is called after invoking close(channel), an exception is thrown: "Please create channel first".
    • If a channel function is called after invoking close(connection), an exception is thrown: "failed to ..., refer to log for more information," and the log indicates "Frame could not be sent".
    • If a channel method is called on an invalid channel due to a closed connection or method execution failure, an exception is thrown: "failed to ..., refer to log for more information," with the log indicating "Frame could not be sent".
    • When creating a channel, if the corresponding connection is closed or the maximum number of connections is reached, an exception is thrown.
    • In summary: After a channel becomes invalid, it is recommended to create a new one. If an exception occurs during channel creation, a new connection needs to be created.
  • If the publish method fails, an exception is thrown. You can catch this exception to recreate a connection and channel and republish the message.
  • The consume method is synchronous, allowing users to know immediately whether the subscription succeeds. However, if it fails during subsequent message processing due to an invalid connection or channel, users are not notified of the failure.
  • Concurrent operations are not allowed on a single channel (or possibly a single connection). For multi-threaded operations, a separate channel/connection is required for each thread.

Examples

GUI1

// Load plugin
loadPlugin("/home/yxu/Desktop/DolphinDBPlugin/rabbitmq/build/PluginRabbitMQ.txt")
// Connect to RabbitMQ
conn = rabbitmq::connection('localhost', 5672, 'guest', 'guest', '');
// Create a channel
ch = rabbitmq::channel(conn)
// Declare an exchange
rabbitmq::declareExchange(ch, 'test-exchange', 'fanout', ['durable'])
rabbitmq::declareExchange(ch, 'test-exchange1', 'fanout', ['durable'])
rabbitmq::declareExchange(ch, 'test-exchange2', 'fanout', ['durable'])
rabbitmq::declareExchange(ch, 'test-exchange3', 'fanout', ['durable'])
rabbitmq::declareExchange(ch, 'test-exchange4', 'fanout', ['durable'])
// Bind an exchange to another exchange
rabbitmq::bindExchange(ch, 'test-exchange1', 'test-exchange2', 'rule1')
// Remove a binding from an exchange to another exchange
rabbitmq::unbindExchange(ch, 'test-exchange1', 'test-exchange2', 'rule1')
// Remove an exchange
rabbitmq::removeExchange(ch, 'test-exchange')
rabbitmq::removeExchange(ch, 'test-exchange1')
rabbitmq::removeExchange(ch, 'test-exchange2')
rabbitmq::removeExchange(ch, 'test-exchange3')
rabbitmq::removeExchange(ch, 'test-exchange4')
// Declare a queue
rabbitmq::declareQueue(ch, 'test-queue1', ['durable'])
rabbitmq::declareQueue(ch, 'test-queue2', ['durable'])
rabbitmq::declareQueue(ch, 'test-queue3', ['durable'])
rabbitmq::declareQueue(ch, 'test-queue4', ['durable'])
// Bind an exchange to a queue
rabbitmq::bindQueue(ch, 'test-exchange1', 'test-queue1', 'rule1')
rabbitmq::bindQueue(ch, 'test-exchange2', 'test-queue2', 'rule1')
rabbitmq::bindQueue(ch, 'test-exchange3', 'test-queue3', 'rule1')
rabbitmq::bindQueue(ch, 'test-exchange4', 'test-queue4', 'rule1')
// Remove a binding from an exchange to a queue
rabbitmq::unbindQueue(ch, 'test-exchange1', 'test-queue', 'rule1')
// Remove a queue
rabbitmq::removeQueue(ch, 'test-queue1')
rabbitmq::removeQueue(ch, 'test-queue2')
rabbitmq::removeQueue(ch, 'test-queue3')
rabbitmq::removeQueue(ch, 'test-queue4')
// Publish data to an exchange
rabbitmq::publish(ch, 'test-exchange1', 'rule1', 'Hello World1', 'bytestream')
rabbitmq::publish(ch, 'test-exchange2', 'rule1', 'Hello World1', 'bytestream')
// Subscribe to a queue for data consumption
def handle(msg){
}
rabbitmq::consume(ch, 'test-queue1', handle, 'bytestream', 'consumer1')

def handle(msg){
}
rabbitmq::consume(ch, 'test-queue2', handle, 'bytestream', 'consumer2')

// Cancel the subscription to a queue
rabbitmq::cancelConsume(ch, 'consumer1')
rabbitmq::cancelConsume(ch, 'consumer2')

// Publish data in JSON format
formatter = rabbitmq::createJSONFormatter()
data = table(1..10 as id, take(now(), 10) as ts, rand(10, 10) as volume)
rabbitmq::publish(ch, 'test-exchange3', 'rule1', data, formatter)

// Publish data in CSV format
MyFormat = take("", 3)
MyFormat[1] = "0.000"
print(MyFormat)
data = table(1..10 as id, take(now(), 10) as ts, rand(10, 10) as volume)
formatter = rabbitmq::createCSVFormatter(MyFormat,',',';')
rabbitmq::publish(ch, 'test-exchange4', 'rule1', data, formatter)

// Consume data in JSON format
parser = rabbitmq::createJSONParser([INT,TIMESTAMP,INT], ["id","ts","volume"])
def handle(msg){
}
rabbitmq::consume(ch, 'test-queue3', handle, parser, 'consumer3')

// Consume data in CSV format
parser1 = rabbitmq::createCSVParser([INT,STRING,INT], ',',';')
def handle(msg){
}
rabbitmq::consume(ch, 'test-queue4', handle, parser1, 'consumer4')

rabbitmq::cancelConsume(ch, 'consumer3')
rabbitmq::cancelConsume(ch, 'consumer4')

// Get the information on all subscriptions in the background
tb = rabbitmq::getSubJobStat()
print(tb)
// Cancel a subscription in the background
rabbitmq::cancelSubJob(subscription)

GUI2

// After subscribing to test-queue2, GUI2 can be closed
def handle(msg){
}
rabbitmq::createSubJob('test-queue2', handle, 'bytestream', 'consumer1',['localhost'], [5672], 'guest', 'guest')

// Consume JSON-format data in the background
parser = rabbitmq::createJSONParser([INT,TIMESTAMP,INT], ["id","ts","volume"])
def handle(msg){
}
rabbitmq::createSubJob('test-queue3', handle, parser, 'consumer1',['localhost'], [5672], 'guest', 'guest')

// Consume CSV-format data in the background
parser1 = rabbitmq::createCSVParser([INT,STRING,INT], ',',';')
def handle(msg){
}
rabbitmq::createSubJob('test-queue4', handle, parser1, 'consumer1',['localhost'], [5672], 'guest', 'guest')