Kafka
With this plugin, you can easily publish or subscribe to Kafka streaming services. It supports serialization and deserialization of the following data types:
- DolphinDB scalar types
- Built-in types of Kafka Java API: String(UTF-8), Short, Integer, Long, Float, Double, Bytes, byte[] and ByteBuffer
- Vector of types above
Pre-compiled installation
Dowload and unzip the folder to the root directory, and execute the following script on Linux:
export LD_LIBRARY_PATH="LD_LIBRARY_PATH:/path/to/bin/linux"
Start DolphinDB server on Linux. Run the following script in DolphinDB to load the plugin:
loadPlugin("/path/to/PluginKafka.txt")
Compiled installation
Prerequisites
Install CMake. For Ubuntu users (repalce apt
with yum
if you use Centos):
sudo apt install cmake
The project depends on 'cppkafka', which depends on 'boost' and 'librdkafka'. Download with the following script:
# The ubuntu which is a low version such as 14.04 will not
# find rdkafka, and you need to compile the librdkafka manully.
# The address is https://github.com/edenhill/librdkafka
# For ubuntu install
sudo apt install librdkafka-dev
sudo apt install libboost-dev
sudo apt install libssl-dev
# For Centos install
sudo yum install librdkafka-devel
sudo yum install boost-devel
sudo yum install openssl-devel
cd /path/to/the/main/project/
git submodule update --init --recursive
If it is too slow to download submodule, you can download it with the cppkafka.git from the hidden file .gitModules.
git clone https://github.com/mfontanini/cppkafka.git
copy the libDolphinDB.so to bin/linux64 or /lib
cp /path/to/dolphindb/server/libDolphinDB.so /path/to/kafka/bin/linux64
Build with cmake
Build the project:
cd /path/to/DolphinDBPlugin/kafka
cd cppkafka
mkdir build
cd build
cmake ..
make
sudo make install
cd ../..
mkdir build
# copy the libDolphinDB.so to ./build
cp /path/to/dolphindb/server/libDolphinDB.so ./build
cd build
cmake ..
make
Move the result to bin/linux64
copy the .so and .txt to bin/linux64
cp /path/to/libPluginKafka.so /path/to/kafka/bin/linux64
cp /path/to/PluginKafka.txt /path/to/kafka/bin/linux64
API Details
Before loading the Kafka plugin, please download it and start Zookeeper and Kafka server. Please refer to this link for details.
Load the Kafka Plugin
Run the following script in DolphinDB to load the plugin (the directory need to be replaced with the path of luginKafka.txt):
loadPlugin("/path/to/PluginKafka.txt")
Producer
Initialization
Syntax
kafka::producer(config);
Arguments
- config: A dictionary indicating the Kafka producer configuration, whose key is a string and value is a string or a boolean. Please refer to CONFIGURATION for more about Kafka configuration.
Details
Create a Kafka producer with specified configurations, and return the handler.
Produce Message
Syntax
kafka::produce(producer, topic, key, value, json, [partition] );
Arguments
- producer: A Kafka producer handler.
- topic: A string indicating the Kafka topic.
- key: A Kafka key.
- value: A Kafka value.
- json: Indicates whether to transfer the data in json format or not.
- partition: An optioanl parameter. It is an integer indicating the Kafka broker partition number.
Details
Produce key-value data in a specified partition.
Note:
Please don't send too much messages at once, otherwise an exception Local: Queue full
might be thrown.
Producer flushing
Syntax
kafka::producerFlush(producer);
Arguments
- producer: A Kafka producer handler.
Details
Flush all the messages of the producer.
Get blocking time
Syntax
kafka::getProducerTime(producer)
Arguments
- producer: A Kafka producer handler.
Set blocking time
Syntax
kafka::setProducerTime(producer, timeout)
Arguments
- producer: A Kafka producer handler.
- timeout: The maximum amount of time you will wait for the response of a request.
Consumer
Initialization
Syntax
kafka::consumer(config)
Arguments
- config: A dictionary indicating the Kafka consumer configuration, whose key is a string and value is an anyVector. Please refer to CONFIGURATION for more about Kafka configuration.
The following example is for users in SASL protocol:
consumerCfg = dict(string, any);
consumerCfg["metadata.broker.list"] = "localhost";
consumerCfg["group.id"] = "test";
consumerCfg["sasl.mechanisms"] = "PLAIN";
consumerCfg["security.protocol"] = "sasl_plaintext";
consumerCfg["sasl.username"] = "admin";
consumerCfg["sasl.password"] = "admin";
consumer = kafka::consumer(consumerCfg);
topics=["test"];
kafka::subscribe(consumer, topics);
kafka::consumerPoll(consumer);
Details
Create a Kafka consumer and return the handler.
Subscribe
Syntax
kafka::subscribe(consumer, topics)
Arguments
- consumer: A Kafka consumer handler.
- topics: A STRING vector indicating the topics to subscribe to.
Details
Subscribe a Kafka topic.
Unsubscribe
Syntax
kafka::unsubscribe(consumer)
Arguments
- consumer: A Kafka consumer handler.
Details
Unsubscribe all topics.
Poll a message
Syntax
kafka::consumerPoll(consumer, [timeout])
Arguments
- consumer: A Kafka consumer handler.
- timeout: The maximum amount of time to wait for a polling.
Details
Save the subscribed data to DolphinDB, and return a tuple. The first element is a string indicating the error message. The second element is a tuple including the following elements: topic, partition, key, value and the timestamp when the consumer received the data.
kafka::consumerPoll
will block the current thread and the poll default timeout is 1000 millisecond.
It is recommended to use function consumerPollBatch
to submit multiple kafka::consumerPoll
tasks.
Poll messages
Syntax
kafka::consumerPollBatch(consumer, batch_size, [time_out])
Arguments
- consumer: A Kafka consumer handler.
- batch_size: The number of messages you want to get.
- timeout: The maximum amount of time to get messages.
Poll messages by multi-thread
Syntax
kafka::createSubJob(consumer, table, parser, description, [timeout])
Arguments
- consumer: A Kafka consumer handler.
- table: A table to store the messages.
- parser: A function to deal with the input data, and it returns a table. You can use mseed::parser or the user-defined function.
- description: A string to describe the thread.
- timeout: The maximum amount of time to get each message.
Details
The parser need a string for input and return a table, and you can use mseed::parser for example or you can define a function by yourself.
Note:
If a task you created subscribes to a partition that has been subscribed by another task, messages wil be split and distributed to all tasks that subscribe to this partition. Therefore, the subscribed messages obtained from these tasks may be incomplete.
Get the muti-thread status
Syntax
kafka::getJobStat()
Arguments
- None.
End the polling thread
Syntax
kafka::cancelSubJob(connection)
Arguments
- connection: The result of the
kafka::createSubJob
, or the subscription id you get from functiongetJobStat()
, which can be LONG, INT, and STRING data type.
Poll messages and return a dictionary
Syntax
kafka::pollDict(consumer, batch_size, [timeout])
Arguments
- consumer: A Kafka consumer handler.
- batch_size: The number of messages you want to get.
- timeout: The maximum amount of time to get messages.
Details
Save the subscribed data to DolphinDB. It returns a DolphinDB dictionary containing messages in the form of key-value pair.
Commit
Syntax
kafka::commit(consumer)
Arguments
- consumer: A Kafka consumer handler.
Details
Commit the offset of the last processed message to producer synchronously. If there is no lastest offset, an exception will be thrown.
Topic Commit
Syntax
kafka::commitTopic(consumer, topics, partitions, offsets)
Arguments
- consumer: A Kafka consumer handler.
- topics: A STRING vector indicating the topics to subscribe to.
- partitions: An INT vector indicating the partition corresponding to each topic.
- offsets: An INT vector indicating the offset corresponding to each topic.
AsyncCommit
Syntax
kafka::asyncCommit(consumer)
Arguments
- consumer: A Kafka consumer handler.
Details
Commit the offset of the last processed message to producer asynchronously to _consumer_offset, a topic that is used to store messages.
AsyncCommitTopic
Syntax
kafka::asyncCommitTopic(consumer, topics, partitions, offsets)
Arguments
- consumer: A Kafka consumer handler.
- topics: A STRING vector indicating the topics to subscribe to.
- partitions: An INT vector indicating the partition corresponding to each topic.
- offsets: An INT vector indicating the offset corresponding to each topic.
Get blocking time
Syntax
kafka::getConsumerTime(consumer)
Arguments
- consumer: A Kafka consumer handler.
Set blocking time
Syntax
kafka::setConsumerTime(consumer, timeout)
Arguments
- consumer: A Kafka consumer handler.
- timeout: The maximum amount of time to get messages.
Assign topic
Syntax
kafka::assign(consumer, topics, partitions, offsets)
Arguments
- consumer: A Kafka consumer handler.
- topics: A STRING vector indicating the topics to subscribe to.
- partitions: An INT vector indicating the partition corresponding to each topic.
- offsets: An INT vector indicating the offset corresponding to each topic.
Details
Unlike kafka::subscribe(consumer, topics)
, this function enables you to assign specific topics, partitions and offsets to the consumer.
Unassign topic
Syntax
kafka::unassign(consumer)
Arguments
- consumer: A Kafka consumer handler.
Details
Unassign all topics of the consumer.
Get the assignment of the consumer
Syntax
kafka::getAssignment(consumer)
Arguments
- consumer: A Kafka consumer handler.
Get offset
Syntax
kafka::getOffset(consumer, topic, partition)
Arguments
- consumer: A Kafka consumer handler.
- topic: A STRING vector indicating the topics to subscribe to.
- partition: An INT vector indicating the partition corresponding to each topic.
Details
Print the offsets of the consumer.
Get Offset Committed
Syntax
kafka::getOffsetCommitted(consumer, topics, partitions, offsets, [timeout])
Arguments
- consumer: A Kafka consumer handler.
- topics: A STRING vector indicating the topics to subscribe to.
- partitions: An INT vector indicating the partition corresponding to each topic.
- offsets: An INT vector indicating the offset corresponding to each topic.
- timeout: The maximum amount of time to get messages.
Details
Get the offsets committed for the given topic/partition list.
Get Offset Position
Syntax
kafka::getOffsetPosition(consumer, topics, partitions)
Arguments
- consumer: A Kafka consumer handler.
- topics: A STRING vector indicating the topics to subscribe to.
- partitions: An INT vector indicating the partition corresponding to each topic.
Details
Get the offset positions for the given topic/partition list.
Store Consumed Offsets
Syntax
kafka::storeConsumedOffset(consumer)
Arguments
- consumer: A Kafka consumer handler.
Details
Store the offsets on the currently assigned topic/partitions (legacy).
Please set enable.auto.offset.store=false, enable.auto.commit=true for consumer, otherwise an error will be reported.
Store Offsets
Syntax
storeOffset(consumer, topics, partitions, offsets)
Arguments
- consumer: A Kafka consumer handler.
- topics: A STRING vector indicating the topics to subscribe to.
- partitions: An INT vector indicating the partition corresponding to each topic.
- offsets: An INT vector indicating the offset corresponding to each topic.
Details
Store the offsets on the given topic/partitions (legacy).
Please set enable.auto.offset.store=false, enable.auto.commit=true for consumer, otherwise an error will be reported.
Get Member ID
Syntax
kafka::getMemId(consumer)
Arguments
- consumer: A Kafka consumer handler.
Details
Get the group member ID.
Queue
Get Main Queue
Syntax
kafka::getMainQueue(consumer)
Arguments
- consumer: A Kafka consumer handler.
Details
Get the global event queue corresponding to the consumer.
Get Consumer Queue
Syntax
kafka::getConsumerQueue(consumer)
Arguments
- consumer: A Kafka consumer handler.
Details
Get the consumer group queue.
Get Partition Queue
Syntax
kafka::getPartitionQueue(consumer, topic, partition)
Arguments
- consumer: A Kafka consumer handler.
- topic: A STRING vector indicating the topics to subscribe to.
- partition: An INT vector indicating the partition corresponding to each topic.
Details
Get the queue of this partition. If the consumer is not assigned to this partition, an empty queue will be returned.
Get queue length
Syntax
kafka::queueLength(queue)
Arguments
- queue: A Kafka queue handler.
Details
Returns the length of the queue
Note:
It is not recommended to use this function if it is deployed on the ARM architecture servers. The result may not be as expected.
Forward to queue
Syntax
kafka::forToQueue(queue, forward_queue)
Arguments
- queue: A Kafka queue handler.
- forward_queue: A Kafka queue handler, indicating the target that the queue forward to.
Stop forwarding to queue
Syntax
kafka::disforToQueue(queue)
Arguments
- queue: A Kafka queue handler.
Details
Stop forwarding to another queue.
Set queue timeout
Syntax
kafka::setQueueTime(queue, timeout)
Arguments
- queue: A Kafka queue handler.
- timeout: The maximum amount of time for the queue.
Get queue timeout
Syntax
kafka::getQueueTime(queue)
Arguments
- queue: A Kafka queue handler.
Details
Get the configured timeout.
Poll message of queue
Syntax
kafka::queuePoll(queue, [timeout])
Arguments
- queue: A Kafka queue handler.
- timeout: The maximum amount of time for the queue.
Poll messages of queue
Syntax
kafka::queuePollBatch(queue, batch_size, [timeout])
Arguments
- queue: A Kafka queue handler.
- batch_size: The number of messages you want to get.
- timeout: The maximum amount of time for the queue.
Event
Get event from a queue
Syntax
kafka::queueEvent(queue)
Arguments
- queue: A Kafka queue handler.
Details
Extract the next event in this queue.
Note:
Before deleting a consumer, please ensure that the events generated by it is still in its lifecycle (specify event=NULL to release resources), otherwise the program may collapse.
Get event name
Syntax
kafka::getEventName(event)
Arguments
- event: A Kafka event handler.
Details
Return the name of the event.
Get messages from the event
Syntax
kafka::eventGetMessage(event)
Arguments
- event: A Kafka event handler.
Details
Get all messages in this event (if any).
Get the count of messages from the event
Syntax
kafka::getEventMessageCount(event)
Arguments
- event: A Kafka event handler.
Get errors of the event
Syntax
kafka::eventGetError(event)
Arguments
- event: A Kafka event handler.
Details
Return error messages in this event.
Get the partition of the event
Syntax
kafka::eventGetPart(event)
Arguments
- event: A Kafka event handler.
Get partitions of the event
Syntax
kafka::eventGetParts(event)
Arguments
- event: A Kafka event handler.
Determine whether it is an event or not
Syntax
kafka::eventBool(event)
Arguments
- event: A Kafka event handler.
Global Setting
Get Buffer Size
Syntax
kafka::getBufferSize()
Set Buffer Size
Syntax
kafka::setBufferSize(size)
Arguments
- size: The capacity of the buffer size you want to set, which is no larger than the buffer size of the broker, and the default value is 900k.
Get Message Size
Syntax
kafka::getMessageSize()
Set Message Size
Syntax
kafka::setMessageSize(size)
Arguments
- size: The capacity of the message size you want to set. The
message_size
is no larger than thebuffer_size
, and the default value is 10k.
Get the Consumer Handler of a Subscription
Syntax
kafka::getSubJobConsumer(connection)
Arguments
- connection: The result of the
kafka::createSubJob
, or the subscription id you get from functiongetJobStat()
, which can be LONG, INT, and STRING data type.
Details
Get the consumer handler of a specified subscription.
Example
#create producer
producerCfg = dict(STRING, ANY);
producerCfg["metadata.broker.list"] = "localhost";
producer = kafka::producer(producerCfg);
#create consumer
consumerCfg = dict(string, any);
consumerCfg["metadata.broker.list"] = "localhost";
consumerCfg["group.id"] = "test";
consumer = kafka::consumer(consumerCfg);
#subscribe
topics=["test"];
kafka::subscribe(consumer, topics);
kafka::consumerPoll(consumer);
#produce and consume English string
kafka::produce(producer, "test", "1", "producer1:i'm producer",false,false);
kafka::consumerPoll(consumer);
#produce and consume Chinese string
kafka::produce(producer, "test", "2", "I am a producer",false,false);
kafka::consumerPoll(consumer);
#produce and consume integer
kafka::produce(producer, "test", "3", 10086,false,false);
kafka::consumerPoll(consumer);
#produce and consume float
kafka::produce(producer, "test", "4", 123.456,false,false);
kafka::consumerPoll(consumer);
#produce and consume integer vector
message=[1,2,3,4];
kafka::produce(producer, "test", 1, message,false,false);
kafka::consumerPoll(consumer);
#produce and consume float vector
message=[1.1,2.2,3.3,4.4];
kafka::produce(producer, "test", 1, message,false,false);
kafka::consumerPoll(consumer);
#produce and consume Chinese string vector
message=["I","I am","I am a","I am a producer","I am a producer"];
kafka::produce(producer, "test", 1, message,false,false);
kafka::consumerPoll(consumer);
#produce and consume table
tab=table(1 2 3 as a, `x`y`z as b, 10.8 7.6 3.5 as c, "I" "I am" "I am a" as d);
kafka::produce(producer, "test", 1, tab,false,false);
kafka::consumerPoll(consumer);
#produce and consume two messages
kafka::produce(producer, "test", 1, "producer1:i'm producer",false,false);
kafka::produce(producer, "test", 1, "I am a producer",false,false);
kafka::consumerPollBatch(consumer,2);
#assign specific partition and offset
topics = ["test"];
partitions = [0];
offsets = [0];
kafka::unassign(consumer);
kafka::assign(consumer,topics,partitions,offsets);
#produce and consumer messages
kafka::produce(producer, "test", "1", "producer1:i'm producer",false,0);
kafka::produce(producer, "test", "2", "I am a producer",false,0);
kafka::produce(producer, "test", "3", 10086,false,0);
kafka::produce(producer, "test", "4", 123.456,false,0);
kafka::consumerPoll(consumer);
kafka::consumerPoll(consumer);
kafka::consumerPoll(consumer);
kafka::consumerPoll(consumer);
#Get the size of specific partitions
kafka::getOffsetCommitted(consumer,topics,partitions,offsets);
kafka::getAssignment(consumer);
kafka::getOffset(consumer,"test",2);
#Get the size of the current offset
kafka::getOffsetPosition(consumer,topics,partitions);
#deal with queue
queue=kafka::getConsumerQueue(consumer);
kafka::queueLength(queue);
kafka::queuePoll(queue);
#deal with event
event=kafka::queueEvent(queue);
kafka::getEventName(event);
kafka::eventGetMessage(event);
kafka::getEventMessageCount(event);
kafka::eventGetPart(event);
kafka::eventGetError(event);
kafka::eventBool(event);
#get a dictionary
kafka::produce(producer, "test", "1", "producer1:i'm producer",false,false,0);
kafka::produce(producer, "test", "2", "I am a producer",false,false,0);
kafka::produce(producer, "test", "3", 10086,false,false,0);
kafka::produce(producer, "test", "4", 123.456,false,false,0);
kafka::pollDict(consumer,4);
#get messages in json format
tab=table(1 2 3 as a, `x`y`z as b, 10.8 7.6 3.5 as c, "I" "I am" "I am a" as d);
dict={"1":1,"2":2,"3":3};
message=[1.1,2.2,3.3,4.4];
vec=[1,message,tab,];
kafka::produce(producer, "test", "1", tab,true,false,0);
kafka::consumerPoll(consumer);
kafka::produce(producer, "test", "1", dict,true,false,0);
kafka::consumerPoll(consumer);
kafka::produce(producer, "test", "1", message,true,false,0);
kafka::consumerPoll(consumer);
kafka::produce(producer, "test", "1", vec,true,false,0);
kafka::consumerPoll(consumer);
#change the buffer_size and message_size
kafka::getBufferSize();
kafka::getMessageSize();
kafka::setBufferSize(100);
kafka::setMessageSize(20);
a=[];
for(i in 0:120){a.append!(i%10)};
kafka::produce(producer,"test","1",a,false,false,0);
kafka::consumerPoll(consumer);
kafka::produce(producer,"test","1",tab,false,false,0);
kafka::consumerPoll(consumer);
#mult-thread
#the multithreading function need a parser, you can install mseed as an example
loadPlugin("/path/to/PluginKafka.txt");
loadPlugin("/path/to/PluginMseed.txt")
consumerCfg = dict(string, any);
consumerCfg["metadata.broker.list"] = "115.239.209.234";
consumerCfg["group.id"] = "test";
consumer = kafka::consumer(consumerCfg);
topics=["test"];
kafka::subscribe(consumer, topics);
tab = table(40000000:0,`id`time`value,[SYMBOL,TIMESTAMP,INT])
conn = kafka::createSubJob(consumer,tab,mseed::parse,"test:0:get mseed data");
kafka::getJobStat();
kafka::cancelSubJob(conn);