pulsar
Apache Pulsar is a multi-tenant, high-performance solution for server-to-server messaging. The DolphinDB Pulsar plugin enables the use of Apache Pulsar in DolphinDB. With this plugin, users can connect to a Pulsar broker, create a producer towrite messages on a topic, as well as create a consumer to subscribe to and read messages from a topic.
The Pulsar plugin currently only supports writing and subscribing to messages of STRING type.
Installation (with installPlugin
)
Required server version: DolphinDB 2.00.10 or higher
Installation Steps:
(1) Use listRemotePlugins to check plugin information in the plugin repository.
Note: For plugins not included in the provided list, you can install through precompiled binaries or compile from source. These files can be accessed from our GitHub repository by switching to the appropriate version branch.
login("admin", "123456")
listRemotePlugins()
(2) Invoke installPlugin for plugin installation
installPlugin("pulsar")
(3) Use loadPlugin to load the plugin before using the plugin methods.
loadPlugin("pulsar")
Method References
Client
Syntax
client(serviceUrl, [clientConfig])
Parameters
- serviceUrl: a string indicating the service URL for the Pulsar cluster to connect to, e.g. pulsar://localhost:6650.
- clientConfig (optional): a dict(STRING, ANY) containing the following
configuration parameters of the client:
- operationTimeoutSeconds
- connectionTimeoutMs
Details
Get the Pulsar client object connected to the specified cluster URL.
Return Value
A Pulsar client object.
producer
Syntax
producer(client, topic, [producerConfig])
Parameters
- client: a Pulsar client object.
- topic: a string indicating the topic name.
- producerConfig (optional): a dict(STRING, ANY) containing the
following configuration parameters of the producer:
- sendTimeoutMs
- maxPendingMessages
- maxPendingMessagesAcrossPartitions
- blockIfQueueFull
- batchingEnabled
- batchingMaxMessages
- batchingMaxPublishDelayMs
Details
Create a Pulsar producer object that writes messages on the specified topic.
Return Value
A Pulsar producer object.
send
Syntax
send(producer, message, [partitionKey])
Parameters
- producer: a Pulsar producer object.
- message: a string indicating the message to be sent.
- partitionKey (optional): a string indicating the partition to which the message should be sent.
Details
Send the specified message to Pulsar.
Return Value
Void
consumer
Syntax
consumer(client, topic, subscriptionName, [consumerConfig])
Parameters
- client: a Pulsar client object.
- topic: a string indicating the topic name.
- subscriptionName: a string indicating the subscription name.
- consumerConfig (optional): a dict(STRING, ANY) containing the
following configuration parameters of the consumer:
- consumerType (of INT type)
0
: ConsumerExclusive1
: ConsumerShared2
: ConsumerFailover3
: ConsumerKeyShared
- receiverQueueSize
- maxTotalReceiverQueueSizeAcrossPartitions
- unAckedMessagesTimeoutMs
- subscriptionInitialPosition (of INT type)
0
: InitialPositionLatest1
: InitialPositionEarliest
- consumerType (of INT type)
Details
Create a Pulsar consumer object that subscribes to the specified topic.
Return Value
A Pulsar consumer object.
receive
Syntax
receive(consumer, [timeoutMs=1000])
Parameters
- consumer: a Pulsar consumer object.
- timeoutMs (optional): an integer (in milliseconds) indicating the receive timeout.
Details
Receive a message from the topic subscribed by consumer.
Return Value
A string indicating the received message.
createSubJob
Syntax
createSubJob(jobName, client, topic, subscriptionName, table, parser, [consumerConfig])
Parameters
- client: a Pulsar client object.
- topic: a string indicating the topic name.
- subscriptionName: a string indicating the subscription name.
- table: a table to store the received data.
- parser: a function to process the received data.
- The function returns a table and takes 1 to 3 STRING parameters, respectively the data, partition key, and topic of the message.
- consumerConfig (optional): a dict(STRING, ANY) containing the configuration parameters of the consumer. For details of the parameters, see consumer.
Details
Create a subscription job that subscribes to a topic and continuously receives data. The received data is processed by parser and stored in table.
Return Value
The handle of the subscription job.
cancelSubJob
Syntax
cancelSubJob(subJob)
Parameters
- subJob: the name (of STRING type) or handle of the subscription job.
Details
Cancel the subscription job.
Return Value
Void
getJobStat
Syntax
getJobStat()
Details
Get the information on all subscription jobs.
Return Value
A table containing the information on all subscription jobs.
getConfig
Syntax
getConfig(handle, configName)
Parameters
- handle: a Pulsar client, producer, consumer, or subscription job.
- configName: a string indicating the name of the configuration
parameter. For details about the configuration parameters of
client
,producer
, andconsumer
, refer to the corresponding section.
Details
Get the value of the specified configuration parameter.
Return Value
The value of the specified configuration parameter.
Examples
loadPlugin("/path/to/PluginPulsar.txt");
## CLIENT
# define config dictionary
clientConfig = dict(["connectionTimeoutMs"], [2000])
client = pulsar::client("pulsar://localhost:6650", clientConfig)
## PRODUCER
producer = pulsar::producer(client, "persistent://public/default/my-topic")
pulsar::send(producer, "my message")
## CONSUMER
consumer = pulsar::consumer(client, "persistent://public/default/my-topic", "consumer-1")
pulsar::receive(consumer)
## SUBSCRIPTION JOB
# define parser
def parser(x) {
return table([x] as msg)