pulsar

Apache Pulsar is a multi-tenant, high-performance solution for server-to-server messaging. The DolphinDB Pulsar plugin enables the use of Apache Pulsar in DolphinDB. With this plugin, users can connect to a Pulsar broker, create a producer towrite messages on a topic, as well as create a consumer to subscribe to and read messages from a topic.

The Pulsar plugin currently only supports writing and subscribing to messages of STRING type.

Installation (with installPlugin)

Required server version: DolphinDB 2.00.10 or higher

Installation Steps:

(1) Use listRemotePlugins to check plugin information in the plugin repository.

Note: For plugins not included in the provided list, you can install through precompiled binaries or compile from source. These files can be accessed from our GitHub repository by switching to the appropriate version branch.

login("admin", "123456")
listRemotePlugins()

(2) Invoke installPlugin for plugin installation

installPlugin("pulsar")

(3) Use loadPlugin to load the plugin before using the plugin methods.

loadPlugin("pulsar")

Method References

Client

Syntax

client(serviceUrl, [clientConfig])

Parameters

  • serviceUrl: a string indicating the service URL for the Pulsar cluster to connect to, e.g. pulsar://localhost:6650.
  • clientConfig (optional): a dict(STRING, ANY) containing the following configuration parameters of the client:
    • operationTimeoutSeconds
    • connectionTimeoutMs

Details

Get the Pulsar client object connected to the specified cluster URL.

Return Value

A Pulsar client object.

producer

Syntax

producer(client, topic, [producerConfig])

Parameters

  • client: a Pulsar client object.
  • topic: a string indicating the topic name.
  • producerConfig (optional): a dict(STRING, ANY) containing the following configuration parameters of the producer:
    • sendTimeoutMs
    • maxPendingMessages
    • maxPendingMessagesAcrossPartitions
    • blockIfQueueFull
    • batchingEnabled
    • batchingMaxMessages
    • batchingMaxPublishDelayMs

Details

Create a Pulsar producer object that writes messages on the specified topic.

Return Value

A Pulsar producer object.

send

Syntax

send(producer, message, [partitionKey])

Parameters

  • producer: a Pulsar producer object.
  • message: a string indicating the message to be sent.
  • partitionKey (optional): a string indicating the partition to which the message should be sent.

Details

Send the specified message to Pulsar.

Return Value

Void

consumer

Syntax

consumer(client, topic, subscriptionName, [consumerConfig])

Parameters

  • client: a Pulsar client object.
  • topic: a string indicating the topic name.
  • subscriptionName: a string indicating the subscription name.
  • consumerConfig (optional): a dict(STRING, ANY) containing the following configuration parameters of the consumer:
    • consumerType (of INT type)
      • 0: ConsumerExclusive
      • 1: ConsumerShared
      • 2: ConsumerFailover
      • 3: ConsumerKeyShared
    • receiverQueueSize
    • maxTotalReceiverQueueSizeAcrossPartitions
    • unAckedMessagesTimeoutMs
    • subscriptionInitialPosition (of INT type)
      • 0: InitialPositionLatest
      • 1: InitialPositionEarliest

Details

Create a Pulsar consumer object that subscribes to the specified topic.

Return Value

A Pulsar consumer object.

receive

Syntax

receive(consumer, [timeoutMs=1000])

Parameters

  • consumer: a Pulsar consumer object.
  • timeoutMs (optional): an integer (in milliseconds) indicating the receive timeout.

Details

Receive a message from the topic subscribed by consumer.

Return Value

A string indicating the received message.

createSubJob

Syntax

createSubJob(jobName, client, topic, subscriptionName, table, parser, [consumerConfig])

Parameters

  • client: a Pulsar client object.
  • topic: a string indicating the topic name.
  • subscriptionName: a string indicating the subscription name.
  • table: a table to store the received data.
  • parser: a function to process the received data.
    • The function returns a table and takes 1 to 3 STRING parameters, respectively the data, partition key, and topic of the message.
  • consumerConfig (optional): a dict(STRING, ANY) containing the configuration parameters of the consumer. For details of the parameters, see consumer.

Details

Create a subscription job that subscribes to a topic and continuously receives data. The received data is processed by parser and stored in table.

Return Value

The handle of the subscription job.

cancelSubJob

Syntax

cancelSubJob(subJob)

Parameters

  • subJob: the name (of STRING type) or handle of the subscription job.

Details

Cancel the subscription job.

Return Value

Void

getJobStat

Syntax

getJobStat()

Details

Get the information on all subscription jobs.

Return Value

A table containing the information on all subscription jobs.

getConfig

Syntax

getConfig(handle, configName)

Parameters

  • handle: a Pulsar client, producer, consumer, or subscription job.
  • configName: a string indicating the name of the configuration parameter. For details about the configuration parameters of client, producer, and consumer, refer to the corresponding section.

Details

Get the value of the specified configuration parameter.

Return Value

The value of the specified configuration parameter.

Examples

loadPlugin("/path/to/PluginPulsar.txt");

## CLIENT

# define config dictionary
clientConfig = dict(["connectionTimeoutMs"], [2000])

client = pulsar::client("pulsar://localhost:6650", clientConfig)

## PRODUCER

producer = pulsar::producer(client, "persistent://public/default/my-topic")

pulsar::send(producer, "my message")

## CONSUMER

consumer = pulsar::consumer(client, "persistent://public/default/my-topic", "consumer-1")

pulsar::receive(consumer)

## SUBSCRIPTION JOB

# define parser
def parser(x) {
     return table([x] as msg)