Module : kafka
Module overview
This module is used to interact with Kafka Brokers via Kafka Consumer and Kafka Producer clients. This module supports kafka 1.x.x and 2.0.0 versions.
Samples
Simple Kafka Consumer
Following is a simple service which is subscribed to a topic 'test-kafka-topic' on remote Kafka broker cluster.
import ballerina/io;
import ballerina/kafka;
import ballerina/lang. 'string;
kafka:ConsumerConfiguration consumerConfigs = {
bootstrapServers:"localhost:9092",
groupId:"group-id",
topics:["test-kafka-topic"],
pollingIntervalInMillis:1000
};
listener kafka:Consumer consumer = new(consumerConfigs);
service kafkaService on consumer {
resource function onMessage(kafka:ConsumerAction consumerAction,
kafka:ConsumerRecord[] records) {
// Dispatched set of Kafka records to service, We process each one by one.
foreach var kafkaRecord in records {
processKafkaRecord(kafkaRecord);
}
}
}
function processKafkaRecord(kafka:ConsumerRecord kafkaRecord) {
var value = kafkaRecord.value;
if (value is byte[]) {
string | error msg = 'string:fromBytes(serializedMsg);
if (msg is string) {
// Print the retrieved Kafka record.
io:println("Topic: ", kafkaRecord.topic, " Received Message: ", msg);
} else {
log:printError("Error occurred while converting message data", msg);
}
}
}
Kafka Producer
Following is a simple program which publishes a message to 'test-kafka-topic' topic in a remote Kafka broker cluster.
import ballerina/kafka;
kafka:ProducerConfiguration producerConfigs = {
// Here we create a producer configs with optional parameters
// client.id - used for broker side logging.
// acks - number of acknowledgments for request complete,
// retryCount - number of retries if record send fails.
bootstrapServers: "localhost:9092",
clientId:"basic-producer",
acks:"all",
retryCount:3
};
kafka:Producer kafkaProducer = new(producerConfigs);
function main () {
string msg = "Hello World, Ballerina";
byte[] serializedMsg = msg.toByteArray("UTF-8");
var sendResult = kafkaProducer->send(serializedMsg, "test-kafka-topic");
if (sendResult is error) {
log:printError("Kafka producer failed to send data", err = sendResult);
}
}
Send Data Using Avro
The Ballerina Kafka module supports Avro serialization and deserialization.
To try this, let's create a new Ballerina project and two modules inside it.
Execute the below commands to do this.
ballerina new kafka_avro_sample
cd kafka_avro_sample
ballerina add producer
ballerina add consumer
Dependencies
To use Avro, you need to add the necessary dependencies to the Ballerina project you created.
To do so, download the necessary dependencies and put them inside the resources
directory. Also, add those dependencies to the Ballerina.toml
file of your project.
The following is a sample Ballerina.toml
file with the dependencies.
[[platform.libraries]]
module = "producer"
path = "./resources/kafka-avro-serializer-5.4.1.jar"
artifactId = "kafka-avro-serializer"
version = "5.4.1"
groupId = "io.confluent"
[[platform.libraries]]
module = "producer"
path = "./resources/kafka-schema-registry-client-5.4.1.jar"
artifactId = "kafka-schema-registry-client"
version = "5.4.1"
groupId = "io.confluent"
[[platform.libraries]]
module = "producer"
path = "./resources/common-config-5.4.1.jar"
artifactId = "common-config"
version = "5.4.1"
groupId = "io.confluent"
[[platform.libraries]]
module = "producer"
path = "./resources/common-utils-5.4.1.jar"
artifactId = "common-utils"
version = "5.4.1"
groupId = "io.confluent"
[[platform.libraries]]
module = "producer"
path = "./resources/avro-1.9.2.jar"
artifactId = "avro"
version = "1.9.2"
groupId = "org.apache.avro"
[[platform.libraries]]
module = "producer"
path = "./resources/jackson-core-2.10.3.jar"
artifactId = "jackson-core"
version = "1.9.2"
groupId = "com.fasterxml.jackson.core"
[[platform.libraries]]
module = "producer"
path = "./resources/jackson-databind-2.10.3.jar"
artifactId = "jackson-databind"
version = "2.10.3"
groupId = "com.fasterxml.jackson.core"
[[platform.libraries]]
module = "producer"
path = "./resources/jackson-annotations-2.10.3.jar"
artifactId = "jackson-annotations"
version = "2.10.3"
groupId = "com.fasterxml.jackson.core"
Now, the directory structure will look like follows. (Some of the files ignored)
├── Ballerina.toml
├── resources
│ ├── avro-1.9.2.jar
│ ├── common-config-5.4.1.jar
│ ├── common-utils-5.4.1.jar
│ ├── jackson-annotations-2.10.3.jar
│ ├── jackson-core-2.10.3.jar
│ ├── jackson-databind-2.10.3.jar
│ ├── kafka-avro-serializer-5.4.1.jar
│ └── kafka-schema-registry-client-5.4.1.jar
└── src
├── consumer
│ └── main.bal
└── producer
└── main.bal
Avro Producer
The kafka:Proucer
can be configured to send data using Avro by providing the following configurations.
schemaString
: This is the schema string, which is used to define the Avro schema.dataRecord
: The data record you want to send through Kafka.
src/producer/main.bal:
import ballerina/io;
import ballerina/kafka;
public type Person record {
string name;
int age;
};
kafka:ProducerConfiguration configs = {
bootstrapServers: "<KAFKA_BROKER_HOST_AND_PORT>",
// Other configurations
valueSerializerType: kafka:SER_AVRO,
schemaRegistryUrl: "<SCHEMA_REGISTRY_URL>"
};
string schema = "{\"type\" : \"record\"," +
"\"namespace\" : \"Thisaru\"," +
"\"name\" : \"person\"," +
"\"fields\" : [" +
"{ \"name\" : \"name\", \"type\" : \"string\" }," +
"{ \"name\" : \"age\", \"type\" : \"int\" }" +
"]}";
public function main() {
kafka:Producer producer = new(configs);
Person person = {
name: "Lahiru Perera",
age: 28
};
kafka:AvroRecord avroRecord = {
schemaString: schema,
dataRecord: person
};
kafka:ProducerError? result = producer->send(avroRecord, "add-person");
if (result is kafka:ProducerError) {
io:println(result);
}
}
Avro Consumer
The Kafka implementation of Ballerina currently supports Avro deserialization only for generic records.
The Consumer will return kafka:AvroGenericRecord
with the data received from Avro.
The following is a sample consumer.
src/producer/main.bal:
import ballerina/io;
import ballerina/kafka;
kafka:ConsumerConfiguration configs = {
bootstrapServers: "<KAFKA_BROKER_HOST_AND_PORT>",
groupId: "test-group",
// Other configurations
topics: ["add-person"],
valueDeserializerType: kafka:DES_AVRO,
schemaRegistryUrl: "<SCHEMA_REGISTRY_URL>"
};
listener kafka:Consumer consumer = new(configs);
service KafkaService on consumer {
resource function onMessage(kafka:Consumer consumer, kafka:ConsumerRecord[] records) {
io:println("Records received");
foreach var kafkaRecord in records {
anydata value = kafkaRecord.value;
if (value is kafka:AvroGenericRecord) {
io:println(value);
} else {
io:println("Invalid record type");
}
}
}
}
Now, execute the below command to run the consumer:
ballerina run consumer
This will start the Kafka service to listen. You can verify it by the following messages, which will be displayed on the screen.
[ballerina/kafka] kafka servers: <KAFKA_BROKER_HOST_AND_PORT>
[ballerina/kafka] subscribed topics: add-person
[ballerina/kafka] started kafka listener
Then, open another terminal and execute the below command to run the producer:
ballerina run producer
Now, the consumer will receive the data and the received data will be printed on the Console as follows.
Records received
name=Lahiru Perera age=28
AvroGenericRecord | Represents a generic Avro record. This is the type of the value returned from an Avro deserializer consumer. |
AvroRecord | Defines a records to send data using Avro serialization. |
ConsumerConfiguration | Configuration related to consumer endpoint. |
ConsumerRecord | Type related to consumer record. |
Detail | Represents the details of an error. |
KeyStore | Record for providing key-store related configurations. |
PartitionOffset | This type represents topic partition position in which consumed record is stored. |
ProducerConfiguration | Struct which represents Kafka Producer configuration. |
Protocols | A record for configuring SSL/TLS protocol and version to be used. |
SecureSocket | Provides configurations for facilitating secure communication with the Kafka server. |
TopicPartition | This type represents a topic partition. |
TrustStore | Record for providing trust-store related configurations. |
Deserializer | Represents a Kafka deserializer object. This object can be used to create custom deserializers for Ballerina Kafka consumers. |
Serializer | Represents a Kafka serializer object. This object can be used to create custom serializers for Ballerina Kafka producers. |
Consumer | Represent a Kafka consumer endpoint. |
Producer | Represent a Kafka producer endpoint. |
DES_BYTE_ARRAY | In-built Kafka byte array deserializer. |
DES_STRING | In-built Kafka string deserializer. |
DES_INT | In-built Kafka int deserializer. |
DES_FLOAT | In-built Kafka float deserializer. |
DES_CUSTOM | User-defined deserializer. |
DES_AVRO | Apache Avro deserializer. |
ISOLATION_COMMITTED | Consumer isolation level value 'read_committed' |
ISOLATION_UNCOMMITTED | Consumer isolation level value 'read_uncommitted' |
ACKS_ALL | Producer acknowledgement type 'all'. This will gurantee that the record will not be lost as long as at least one in-sync replica is alive. |
ACKS_NONE | Producer acknowledgement type '0'. If the acknowledgement type set to this, the producer will not wait for any acknowledgement from the server. |
ACKS_SINGLE | Producer acknowledgement type '1'. If the acknowledgement type set to this, the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. |
SER_BYTE_ARRAY | In-built Kafka Byte Array serializer. |
SER_STRING | In-built Kafka string serializer. |
SER_INT | In-built Kafka int serializer. |
SER_FLOAT | In-built Kafka float serializer. |
SER_CUSTOM | User-defined serializer. |
SER_AVRO | Apache avro serializer. |
COMPRESSION_NONE | Kafka compression type. Value 'none' |
COMPRESSION_GZIP | Kafka compression type. Value 'gzip' |
COMPRESSION_SNAPPY | Kafka compression type. Value 'snappy' |
COMPRESSION_LZ4 | Kafka compression type. Value 'lz4' |
COMPRESSION_ZSTD | Kafka compression type. Value 'zstd' |
CONSUMER_ERROR | Defines a Kafka consumer related error |
PRODUCER_ERROR | Defines a Kafka producer related error |
AVRO_ERROR | Defines an Avro serialization / deserialization error |
CompressionType | kafka compression type to compress the messages |
DeserializerType | Kafka in-built deserializer type. |
IsolationLevel | Kafka consumer isolation level type. |
ProducerAcks | Kafka producer acknowledgement type. |
SerializerType | Kafka in-built serializer type. |
AvroError | Represents a Kafka Avro related error |
ConsumerError | Represents a Kafka consumer related error |
ProducerError | Represents a Kafka producer related error |