Kafka

With this plugin, you can easily publish or subscribe to Kafka streaming services. It supports serialization and deserialization of the following data types:

  • DolphinDB scalar types
  • Built-in types of Kafka Java API: String(UTF-8), Short, Integer, Long, Float, Double, Bytes, byte[] and ByteBuffer
  • Vector of types above

Pre-compiled installation

Dowload and unzip the folder to the root directory, and execute the following script on Linux:

export LD_LIBRARY_PATH="LD_LIBRARY_PATH:/path/to/bin/linux"

Start DolphinDB server on Linux. Run the following script in DolphinDB to load the plugin:

loadPlugin("/path/to/PluginKafka.txt")

Compiled installation

Prerequisites

Install CMake. For Ubuntu users (repalce apt with yum if you use Centos):

sudo apt install cmake

The project depends on 'cppkafka', which depends on 'boost' and 'librdkafka'. Download with the following script:

# The ubuntu which is a low version such as 14.04 will not 
# find rdkafka, and you need to compile the librdkafka manully.
# The address is https://github.com/edenhill/librdkafka

# For ubuntu install
sudo apt install librdkafka-dev
sudo apt install libboost-dev
sudo apt install libssl-dev

# For Centos install
sudo yum install librdkafka-devel
sudo yum install boost-devel
sudo yum install openssl-devel

cd /path/to/the/main/project/
git submodule update --init --recursive

If it is too slow to download submodule, you can download it with the cppkafka.git from the hidden file .gitModules.

git clone https://github.com/mfontanini/cppkafka.git

copy the libDolphinDB.so to bin/linux64 or /lib

cp /path/to/dolphindb/server/libDolphinDB.so /path/to/kafka/bin/linux64

Build with cmake

Build the project:

cd /path/to/DolphinDBPlugin/kafka
cd cppkafka
mkdir build
cd build
cmake ..
make
sudo make install
cd ../..
mkdir build
# copy the libDolphinDB.so to ./build
cp /path/to/dolphindb/server/libDolphinDB.so ./build
cd build
cmake ..
make

Move the result to bin/linux64

copy the .so and .txt to bin/linux64

cp /path/to/libPluginKafka.so /path/to/kafka/bin/linux64
cp /path/to/PluginKafka.txt /path/to/kafka/bin/linux64

API Details

Before loading the Kafka plugin, please download it and start Zookeeper and Kafka server. Please refer to this link for details.

Load the Kafka Plugin

Run the following script in DolphinDB to load the plugin (the directory need to be replaced with the path of luginKafka.txt):

loadPlugin("/path/to/PluginKafka.txt")

Producer

Initialization

Syntax

kafka::producer(config);

Arguments

  • config: A dictionary indicating the Kafka producer configuration, whose key is a string and value is a string or a boolean. Please refer to CONFIGURATION for more about Kafka configuration.

Details

Create a Kafka producer with specified configurations, and return the handler.

Produce Message

Syntax

kafka::produce(producer, topic, key, value, json, [partition] );

Arguments

  • producer: A Kafka producer handler.
  • topic: A string indicating the Kafka topic.
  • key: A Kafka key.
  • value: A Kafka value.
  • json: Indicates whether to transfer the data in json format or not.
  • partition: An optioanl parameter. It is an integer indicating the Kafka broker partition number.

Details

Produce key-value data in a specified partition.

Note:

Please don't send too much messages at once, otherwise an exception Local: Queue full might be thrown.

Producer flushing

Syntax

kafka::producerFlush(producer);

Arguments

  • producer: A Kafka producer handler.

Details

Flush all the messages of the producer.

Get blocking time

Syntax

kafka::getProducerTime(producer)

Arguments

  • producer: A Kafka producer handler.

Set blocking time

Syntax

kafka::setProducerTime(producer, timeout)

Arguments

  • producer: A Kafka producer handler.
  • timeout: The maximum amount of time you will wait for the response of a request.

Consumer

Initialization

Syntax

kafka::consumer(config)

Arguments

  • config: A dictionary indicating the Kafka consumer configuration, whose key is a string and value is an anyVector. Please refer to CONFIGURATION for more about Kafka configuration.

The following example is for users in SASL protocol:

consumerCfg = dict(string, any);
consumerCfg["metadata.broker.list"] = "localhost";
consumerCfg["group.id"] = "test";
consumerCfg["sasl.mechanisms"] = "PLAIN";
consumerCfg["security.protocol"] = "sasl_plaintext";
consumerCfg["sasl.username"] = "admin";
consumerCfg["sasl.password"] = "admin";
consumer = kafka::consumer(consumerCfg);
topics=["test"];
kafka::subscribe(consumer, topics);
kafka::consumerPoll(consumer);

Details

Create a Kafka consumer and return the handler.

Subscribe

Syntax

kafka::subscribe(consumer, topics)

Arguments

  • consumer: A Kafka consumer handler.
  • topics: A STRING vector indicating the topics to subscribe to.

Details

Subscribe a Kafka topic.

Unsubscribe

Syntax

kafka::unsubscribe(consumer)

Arguments

  • consumer: A Kafka consumer handler.

Details

Unsubscribe all topics.

Poll a message

Syntax

kafka::consumerPoll(consumer, [timeout])

Arguments

  • consumer: A Kafka consumer handler.
  • timeout: The maximum amount of time to wait for a polling.

Details

Save the subscribed data to DolphinDB, and return a tuple. The first element is a string indicating the error message. The second element is a tuple including the following elements: topic, partition, key, value and the timestamp when the consumer received the data.

kafka::consumerPoll will block the current thread and the poll default timeout is 1000 millisecond.

It is recommended to use function consumerPollBatch to submit multiple kafka::consumerPoll tasks.

Poll messages

Syntax

kafka::consumerPollBatch(consumer, batch_size, [time_out])

Arguments

  • consumer: A Kafka consumer handler.
  • batch_size: The number of messages you want to get.
  • timeout: The maximum amount of time to get messages.

Poll messages by multi-thread

Syntax

kafka::createSubJob(consumer, table, parser, description, [timeout])

Arguments

  • consumer: A Kafka consumer handler.
  • table: A table to store the messages.
  • parser: A function to deal with the input data, and it returns a table. You can use mseed::parser or the user-defined function.
  • description: A string to describe the thread.
  • timeout: The maximum amount of time to get each message.

Details

The parser need a string for input and return a table, and you can use mseed::parser for example or you can define a function by yourself.

Note:

If a task you created subscribes to a partition that has been subscribed by another task, messages wil be split and distributed to all tasks that subscribe to this partition. Therefore, the subscribed messages obtained from these tasks may be incomplete.

Get the muti-thread status

Syntax

kafka::getJobStat()

Arguments

  • None.

End the polling thread

Syntax

kafka::cancelSubJob(connection)

Arguments

  • connection: The result of the kafka::createSubJob, or the subscription id you get from function getJobStat(), which can be LONG, INT, and STRING data type.

Poll messages and return a dictionary

Syntax

kafka::pollDict(consumer, batch_size, [timeout])

Arguments

  • consumer: A Kafka consumer handler.
  • batch_size: The number of messages you want to get.
  • timeout: The maximum amount of time to get messages.

Details

Save the subscribed data to DolphinDB. It returns a DolphinDB dictionary containing messages in the form of key-value pair.

Commit

Syntax

kafka::commit(consumer)

Arguments

  • consumer: A Kafka consumer handler.

Details

Commit the offset of the last processed message to producer synchronously. If there is no lastest offset, an exception will be thrown.

Topic Commit

Syntax

kafka::commitTopic(consumer, topics, partitions, offsets)

Arguments

  • consumer: A Kafka consumer handler.
  • topics: A STRING vector indicating the topics to subscribe to.
  • partitions: An INT vector indicating the partition corresponding to each topic.
  • offsets: An INT vector indicating the offset corresponding to each topic.

AsyncCommit

Syntax

kafka::asyncCommit(consumer)

Arguments

  • consumer: A Kafka consumer handler.

Details

Commit the offset of the last processed message to producer asynchronously to _consumer_offset, a topic that is used to store messages.

AsyncCommitTopic

Syntax

kafka::asyncCommitTopic(consumer, topics, partitions, offsets)

Arguments

  • consumer: A Kafka consumer handler.
  • topics: A STRING vector indicating the topics to subscribe to.
  • partitions: An INT vector indicating the partition corresponding to each topic.
  • offsets: An INT vector indicating the offset corresponding to each topic.

Get blocking time

Syntax

kafka::getConsumerTime(consumer)

Arguments

  • consumer: A Kafka consumer handler.

Set blocking time

Syntax

kafka::setConsumerTime(consumer, timeout)

Arguments

  • consumer: A Kafka consumer handler.
  • timeout: The maximum amount of time to get messages.

Assign topic

Syntax

kafka::assign(consumer, topics, partitions, offsets)

Arguments

  • consumer: A Kafka consumer handler.
  • topics: A STRING vector indicating the topics to subscribe to.
  • partitions: An INT vector indicating the partition corresponding to each topic.
  • offsets: An INT vector indicating the offset corresponding to each topic.

Details

Unlike kafka::subscribe(consumer, topics), this function enables you to assign specific topics, partitions and offsets to the consumer.

Unassign topic

Syntax

kafka::unassign(consumer)

Arguments

  • consumer: A Kafka consumer handler.

Details

Unassign all topics of the consumer.

Get the assignment of the consumer

Syntax

kafka::getAssignment(consumer)

Arguments

  • consumer: A Kafka consumer handler.

Get offset

Syntax

kafka::getOffset(consumer, topic, partition)

Arguments

  • consumer: A Kafka consumer handler.
  • topic: A STRING vector indicating the topics to subscribe to.
  • partition: An INT vector indicating the partition corresponding to each topic.

Details

Print the offsets of the consumer.

Get Offset Committed

Syntax

kafka::getOffsetCommitted(consumer, topics, partitions, offsets, [timeout])

Arguments

  • consumer: A Kafka consumer handler.
  • topics: A STRING vector indicating the topics to subscribe to.
  • partitions: An INT vector indicating the partition corresponding to each topic.
  • offsets: An INT vector indicating the offset corresponding to each topic.
  • timeout: The maximum amount of time to get messages.

Details

Get the offsets committed for the given topic/partition list.

Get Offset Position

Syntax

kafka::getOffsetPosition(consumer, topics, partitions)

Arguments

  • consumer: A Kafka consumer handler.
  • topics: A STRING vector indicating the topics to subscribe to.
  • partitions: An INT vector indicating the partition corresponding to each topic.

Details

Get the offset positions for the given topic/partition list.

Store Consumed Offsets

Syntax

kafka::storeConsumedOffset(consumer)

Arguments

  • consumer: A Kafka consumer handler.

Details

Store the offsets on the currently assigned topic/partitions (legacy).

Please set enable.auto.offset.store=false, enable.auto.commit=true for consumer, otherwise an error will be reported.

Store Offsets

Syntax

storeOffset(consumer, topics, partitions, offsets)

Arguments

  • consumer: A Kafka consumer handler.
  • topics: A STRING vector indicating the topics to subscribe to.
  • partitions: An INT vector indicating the partition corresponding to each topic.
  • offsets: An INT vector indicating the offset corresponding to each topic.

Details

Store the offsets on the given topic/partitions (legacy).

Please set enable.auto.offset.store=false, enable.auto.commit=true for consumer, otherwise an error will be reported.

Get Member ID

Syntax

kafka::getMemId(consumer)

Arguments

  • consumer: A Kafka consumer handler.

Details

Get the group member ID.

Queue

Get Main Queue

Syntax

kafka::getMainQueue(consumer)

Arguments

  • consumer: A Kafka consumer handler.

Details

Get the global event queue corresponding to the consumer.

Get Consumer Queue

Syntax

kafka::getConsumerQueue(consumer)

Arguments

  • consumer: A Kafka consumer handler.

Details

Get the consumer group queue.

Get Partition Queue

Syntax

kafka::getPartitionQueue(consumer, topic, partition)

Arguments

  • consumer: A Kafka consumer handler.
  • topic: A STRING vector indicating the topics to subscribe to.
  • partition: An INT vector indicating the partition corresponding to each topic.

Details

Get the queue of this partition. If the consumer is not assigned to this partition, an empty queue will be returned.

Get queue length

Syntax

kafka::queueLength(queue)

Arguments

  • queue: A Kafka queue handler.

Details

Returns the length of the queue

Note:

It is not recommended to use this function if it is deployed on the ARM architecture servers. The result may not be as expected.

Forward to queue

Syntax

kafka::forToQueue(queue, forward_queue)

Arguments

  • queue: A Kafka queue handler.
  • forward_queue: A Kafka queue handler, indicating the target that the queue forward to.

Stop forwarding to queue

Syntax

kafka::disforToQueue(queue)

Arguments

  • queue: A Kafka queue handler.

Details

Stop forwarding to another queue.

Set queue timeout

Syntax

kafka::setQueueTime(queue, timeout)

Arguments

  • queue: A Kafka queue handler.
  • timeout: The maximum amount of time for the queue.

Get queue timeout

Syntax

kafka::getQueueTime(queue)

Arguments

  • queue: A Kafka queue handler.

Details

Get the configured timeout.

Poll message of queue

Syntax

kafka::queuePoll(queue, [timeout])

Arguments

  • queue: A Kafka queue handler.
  • timeout: The maximum amount of time for the queue.

Poll messages of queue

Syntax

kafka::queuePollBatch(queue, batch_size, [timeout])

Arguments

  • queue: A Kafka queue handler.
  • batch_size: The number of messages you want to get.
  • timeout: The maximum amount of time for the queue.

Event

Get event from a queue

Syntax

kafka::queueEvent(queue)

Arguments

  • queue: A Kafka queue handler.

Details

Extract the next event in this queue.

Note:

Before deleting a consumer, please ensure that the events generated by it is still in its lifecycle (specify event=NULL to release resources), otherwise the program may collapse.

Get event name

Syntax

kafka::getEventName(event)

Arguments

  • event: A Kafka event handler.

Details

Return the name of the event.

Get messages from the event

Syntax

kafka::eventGetMessage(event)

Arguments

  • event: A Kafka event handler.

Details

Get all messages in this event (if any).

Get the count of messages from the event

Syntax

kafka::getEventMessageCount(event)

Arguments

  • event: A Kafka event handler.

Get errors of the event

Syntax

kafka::eventGetError(event)

Arguments

  • event: A Kafka event handler.

Details

Return error messages in this event.

Get the partition of the event

Syntax

kafka::eventGetPart(event)

Arguments

  • event: A Kafka event handler.

Get partitions of the event

Syntax

kafka::eventGetParts(event)

Arguments

  • event: A Kafka event handler.

Determine whether it is an event or not

Syntax

kafka::eventBool(event)

Arguments

  • event: A Kafka event handler.

Global Setting

Get Buffer Size

Syntax

kafka::getBufferSize()

Set Buffer Size

Syntax

kafka::setBufferSize(size)

Arguments

  • size: The capacity of the buffer size you want to set, which is no larger than the buffer size of the broker, and the default value is 900k.

Get Message Size

Syntax

kafka::getMessageSize()

Set Message Size

Syntax

kafka::setMessageSize(size)

Arguments

  • size: The capacity of the message size you want to set. The message_size is no larger than the buffer_size, and the default value is 10k.

Get the Consumer Handler of a Subscription

Syntax

kafka::getSubJobConsumer(connection)

Arguments

  • connection: The result of the kafka::createSubJob, or the subscription id you get from function getJobStat(), which can be LONG, INT, and STRING data type.

Details

Get the consumer handler of a specified subscription.

Example

#create producer
producerCfg = dict(STRING, ANY);
producerCfg["metadata.broker.list"] = "localhost";
producer = kafka::producer(producerCfg);

#create consumer
consumerCfg = dict(string, any);
consumerCfg["metadata.broker.list"] = "localhost";
consumerCfg["group.id"] = "test";
consumer = kafka::consumer(consumerCfg);

#subscribe
topics=["test"];
kafka::subscribe(consumer, topics);
kafka::consumerPoll(consumer);

#produce and consume English string
kafka::produce(producer, "test", "1", "producer1:i'm producer",false,false);
kafka::consumerPoll(consumer);
#produce and consume Chinese string
    kafka::produce(producer, "test", "2", "I am a producer",false,false);
kafka::consumerPoll(consumer);
#produce and consume integer
kafka::produce(producer, "test", "3", 10086,false,false);
kafka::consumerPoll(consumer);
#produce and consume float
kafka::produce(producer, "test", "4", 123.456,false,false);
kafka::consumerPoll(consumer);
#produce and consume integer vector
message=[1,2,3,4];
kafka::produce(producer, "test", 1, message,false,false);
kafka::consumerPoll(consumer);
#produce and consume float vector
message=[1.1,2.2,3.3,4.4];
kafka::produce(producer, "test", 1, message,false,false);
kafka::consumerPoll(consumer);
#produce and consume Chinese string vector
message=["I","I am","I am a","I am a producer","I am a producer"];
kafka::produce(producer, "test", 1, message,false,false);
kafka::consumerPoll(consumer);
#produce and consume table
tab=table(1 2 3 as a, `x`y`z as b, 10.8 7.6 3.5 as c, "I" "I am" "I am a" as d);
kafka::produce(producer, "test", 1, tab,false,false);
kafka::consumerPoll(consumer);

#produce and consume two messages
kafka::produce(producer, "test", 1, "producer1:i'm producer",false,false);
kafka::produce(producer, "test", 1, "I am a producer",false,false);
kafka::consumerPollBatch(consumer,2);

#assign specific partition and offset
topics = ["test"];
partitions = [0];
offsets = [0];
kafka::unassign(consumer);
kafka::assign(consumer,topics,partitions,offsets);

#produce and consumer messages
kafka::produce(producer, "test", "1", "producer1:i'm producer",false,0);
kafka::produce(producer, "test", "2", "I am a producer",false,0);
kafka::produce(producer, "test", "3", 10086,false,0);
kafka::produce(producer, "test", "4", 123.456,false,0);
kafka::consumerPoll(consumer);
kafka::consumerPoll(consumer);
kafka::consumerPoll(consumer);
kafka::consumerPoll(consumer);

#Get the size of specific partitions
kafka::getOffsetCommitted(consumer,topics,partitions,offsets);
kafka::getAssignment(consumer);
kafka::getOffset(consumer,"test",2);
#Get the size of the current offset
kafka::getOffsetPosition(consumer,topics,partitions);

#deal with queue
queue=kafka::getConsumerQueue(consumer);
kafka::queueLength(queue);
kafka::queuePoll(queue);

#deal with event
event=kafka::queueEvent(queue);
kafka::getEventName(event);
kafka::eventGetMessage(event);
kafka::getEventMessageCount(event);
kafka::eventGetPart(event);
kafka::eventGetError(event);
kafka::eventBool(event);

#get a dictionary
kafka::produce(producer, "test", "1", "producer1:i'm producer",false,false,0);
kafka::produce(producer, "test", "2", "I am a producer",false,false,0);
kafka::produce(producer, "test", "3", 10086,false,false,0);
kafka::produce(producer, "test", "4", 123.456,false,false,0);
kafka::pollDict(consumer,4);

#get messages in json format
tab=table(1 2 3 as a, `x`y`z as b, 10.8 7.6 3.5 as c, "I" "I am" "I am a" as d);
dict={"1":1,"2":2,"3":3};
message=[1.1,2.2,3.3,4.4];
vec=[1,message,tab,];
kafka::produce(producer, "test", "1", tab,true,false,0);
kafka::consumerPoll(consumer);
kafka::produce(producer, "test", "1", dict,true,false,0);
kafka::consumerPoll(consumer);
kafka::produce(producer, "test", "1", message,true,false,0);
kafka::consumerPoll(consumer);
kafka::produce(producer, "test", "1", vec,true,false,0);
kafka::consumerPoll(consumer);

#change the buffer_size and message_size
kafka::getBufferSize();
kafka::getMessageSize();
kafka::setBufferSize(100);
kafka::setMessageSize(20);
a=[];
for(i in 0:120){a.append!(i%10)};
kafka::produce(producer,"test","1",a,false,false,0);
kafka::consumerPoll(consumer);
kafka::produce(producer,"test","1",tab,false,false,0);
kafka::consumerPoll(consumer);

#mult-thread
#the multithreading function need a parser, you can install mseed as an example

loadPlugin("/path/to/PluginKafka.txt");
loadPlugin("/path/to/PluginMseed.txt")

consumerCfg = dict(string, any);
consumerCfg["metadata.broker.list"] = "115.239.209.234";
consumerCfg["group.id"] = "test";
consumer = kafka::consumer(consumerCfg);

topics=["test"];
kafka::subscribe(consumer, topics);
tab = table(40000000:0,`id`time`value,[SYMBOL,TIMESTAMP,INT])

conn = kafka::createSubJob(consumer,tab,mseed::parse,"test:0:get mseed data");
kafka::getJobStat();
kafka::cancelSubJob(conn);