Module : kafka
Module overview
This module is used to interact with Kafka Brokers via Kafka Consumer and Kafka Producer clients. This module supports kafka 1.x.x and 2.0.0 versions.
Samples
Simple Kafka Consumer
Following is a simple service which is subscribed to a topic 'test-kafka-topic' on remote Kafka broker cluster.
import ballerina/io;
import ballerina/kafka;
import ballerina/lang. 'string;
kafka:ConsumerConfig consumerConfigs = {
bootstrapServers:"localhost:9092",
groupId:"group-id",
topics:["test-kafka-topic"],
pollingIntervalInMillis:1000
};
listener kafka:Consumer consumer = new(consumerConfigs);
service kafkaService on consumer {
resource function onMessage(kafka:ConsumerAction consumerAction,
kafka:ConsumerRecord[] records) {
// Dispatched set of Kafka records to service, We process each one by one.
foreach var kafkaRecord in records {
processKafkaRecord(kafkaRecord);
}
}
}
function processKafkaRecord(kafka:ConsumerRecord kafkaRecord) {
byte[] serializedMsg = kafkaRecord.value;
string | error msg = 'string:fromBytes(serializedMsg);
if (msg is string) {
// Print the retrieved Kafka record.
io:println("Topic: ", kafkaRecord.topic, " Received Message: ", msg);
} else {
log:printError("Error occurred while converting message data", msg);
}
}
Kafka Producer
Following is a simple program which publishes a message to 'test-kafka-topic' topic in a remote Kafka broker cluster.
import ballerina/kafka;
kafka:ProducerConfig producerConfigs = {
// Here we create a producer configs with optional parameters
// client.id - used for broker side logging.
// acks - number of acknowledgments for request complete,
// retryCount - number of retries if record send fails.
bootstrapServers: "localhost:9092",
clientId:"basic-producer",
acks:"all",
retryCount:3
};
kafka:Producer kafkaProducer = new(producerConfigs);
function main () {
string msg = "Hello World, Ballerina";
byte[] serializedMsg = msg.toByteArray("UTF-8");
var sendResult = kafkaProducer->send(serializedMsg, "test-kafka-topic");
if (sendResult is error) {
log:printError("Kafka producer failed to send data", err = sendResult);
}
}
Note: The default thread pool size used in Ballerina is number of processors available * 2. You can configure the thread pool size by using the
BALLERINA_MAX_POOL_SIZE
environment variable.
ConsumerConfig | Configuration related to consumer endpoint. |
ConsumerRecord | Type related to consumer record. |
Detail | Represents the details of an error. |
KeyStore | Record for providing key-store related configurations. |
PartitionOffset | This type represents topic partition position in which consumed record is stored. |
ProducerConfig | Struct which represents Kafka Producer configuration. |
Protocols | A record for configuring SSL/TLS protocol and version to be used. |
SecureSocket | Provides configurations for facilitating secure communication with the Kafka server. |
TopicPartition | This type represents a topic partition. |
TrustStore | Record for providing trust-store related configurations. |
Consumer | Represent a Kafka consumer endpoint. |
Producer | Represent a Kafka producer endpoint. |
CONSUMER_ERROR | |
PRODUCER_ERROR | |
ACKS_ALL | Producer acknowledgement type |
ACKS_NONE | Producer acknowledgement type |
ACKS_SINGLE | Producer acknowledgement type |
Producer_Acks | Kafka producer acknowledgement type. |
ConsumerError | Kafka Consumer related errors |
ProducerError | Kafka Producer related errors |