Module : kafka

Module overview

This module is used to interact with Kafka Brokers via Kafka Consumer and Kafka Producer clients. This module supports kafka 1.x.x and 2.0.0 versions.

Samples

Simple Kafka Consumer

Following is a simple service which is subscribed to a topic 'test-kafka-topic' on remote Kafka broker cluster.

import ballerina/io;
import ballerina/kafka;
import ballerina/lang. 'string;

kafka:ConsumerConfiguration consumerConfigs = {
    bootstrapServers:"localhost:9092",
    groupId:"group-id",
    topics:["test-kafka-topic"],
    pollingIntervalInMillis:1000
};

listener kafka:Consumer consumer = new(consumerConfigs);

service kafkaService on consumer {

    resource function onMessage(kafka:ConsumerAction consumerAction,
                  kafka:ConsumerRecord[] records) {
        // Dispatched set of Kafka records to service, We process each one by one.
        foreach var kafkaRecord in records {
            processKafkaRecord(kafkaRecord);
        }
    }
}

function processKafkaRecord(kafka:ConsumerRecord kafkaRecord) {
    var value = kafkaRecord.value;
    if (value is byte[]) {
        string | error msg = 'string:fromBytes(serializedMsg);
        if (msg is string) {
            // Print the retrieved Kafka record.
            io:println("Topic: ", kafkaRecord.topic, " Received Message: ", msg);
        } else {
            log:printError("Error occurred while converting message data", msg);
        }
    }
}

Kafka Producer

Following is a simple program which publishes a message to 'test-kafka-topic' topic in a remote Kafka broker cluster.

import ballerina/kafka;

kafka:ProducerConfiguration producerConfigs = {
    // Here we create a producer configs with optional parameters 
    // client.id - used for broker side logging.
    // acks - number of acknowledgments for request complete,
    // retryCount - number of retries if record send fails.
    bootstrapServers: "localhost:9092",
    clientId:"basic-producer",
    acks:"all",
    retryCount:3
};

kafka:Producer kafkaProducer = new(producerConfigs);

function main () {
    string msg = "Hello World, Ballerina";
    byte[] serializedMsg = msg.toByteArray("UTF-8");
    var sendResult = kafkaProducer->send(serializedMsg, "test-kafka-topic");
    if (sendResult is error) {
        log:printError("Kafka producer failed to send data", err = sendResult);
    }
}

Send Data Using Avro

The Ballerina Kafka module supports Avro serialization and deserialization.

To try this, let's create a new Ballerina project and two modules inside it.

Execute the below commands to do this.

ballerina new kafka_avro_sample
cd kafka_avro_sample
ballerina add producer
ballerina add consumer

Dependencies

To use Avro, you need to add the necessary dependencies to the Ballerina project you created. To do so, download the necessary dependencies and put them inside the resources directory. Also, add those dependencies to the Ballerina.toml file of your project. The following is a sample Ballerina.toml file with the dependencies.

    [[platform.libraries]]
    module = "producer"
    path = "./resources/kafka-avro-serializer-5.4.1.jar"
    artifactId = "kafka-avro-serializer"
    version = "5.4.1"
    groupId = "io.confluent"

    [[platform.libraries]]
    module = "producer"
    path = "./resources/kafka-schema-registry-client-5.4.1.jar"
    artifactId = "kafka-schema-registry-client"
    version = "5.4.1"
    groupId = "io.confluent"

    [[platform.libraries]]
    module = "producer"
    path = "./resources/common-config-5.4.1.jar"
    artifactId = "common-config"
    version = "5.4.1"
    groupId = "io.confluent"

    [[platform.libraries]]
    module = "producer"
    path = "./resources/common-utils-5.4.1.jar"
    artifactId = "common-utils"
    version = "5.4.1"
    groupId = "io.confluent"

    [[platform.libraries]]
    module = "producer"
    path = "./resources/avro-1.9.2.jar"
    artifactId = "avro"
    version = "1.9.2"
    groupId = "org.apache.avro"

    [[platform.libraries]]
    module = "producer"
    path = "./resources/jackson-core-2.10.3.jar"
    artifactId = "jackson-core"
    version = "1.9.2"
    groupId = "com.fasterxml.jackson.core"

    [[platform.libraries]]
    module = "producer"
    path = "./resources/jackson-databind-2.10.3.jar"
    artifactId = "jackson-databind"
    version = "2.10.3"
    groupId = "com.fasterxml.jackson.core"

    [[platform.libraries]]
    module = "producer"
    path = "./resources/jackson-annotations-2.10.3.jar"
    artifactId = "jackson-annotations"
    version = "2.10.3"
    groupId = "com.fasterxml.jackson.core"

Now, the directory structure will look like follows. (Some of the files ignored)

├── Ballerina.toml
├── resources
│   ├── avro-1.9.2.jar
│   ├── common-config-5.4.1.jar
│   ├── common-utils-5.4.1.jar
│   ├── jackson-annotations-2.10.3.jar
│   ├── jackson-core-2.10.3.jar
│   ├── jackson-databind-2.10.3.jar
│   ├── kafka-avro-serializer-5.4.1.jar
│   └── kafka-schema-registry-client-5.4.1.jar
└── src
    ├── consumer
    │   └── main.bal
    └── producer
        └── main.bal

Avro Producer

The kafka:Proucer can be configured to send data using Avro by providing the following configurations.

src/producer/main.bal:

import ballerina/io;
import ballerina/kafka;

public type Person record {
    string name;
    int age;
};

kafka:ProducerConfiguration configs = {
    bootstrapServers: "<KAFKA_BROKER_HOST_AND_PORT>",
    // Other configurations
    valueSerializerType: kafka:SER_AVRO,
    schemaRegistryUrl: "<SCHEMA_REGISTRY_URL>"
};

string schema = "{\"type\" : \"record\"," +
                  "\"namespace\" : \"Thisaru\"," +
                  "\"name\" : \"person\"," +
                  "\"fields\" : [" + 
                    "{ \"name\" : \"name\", \"type\" : \"string\" }," +
                    "{ \"name\" : \"age\", \"type\" : \"int\" }" +
                  "]}";

public function main() {
    kafka:Producer producer = new(configs);

    Person person = {
        name: "Lahiru Perera",
        age: 28
    };

    kafka:AvroRecord avroRecord = {
        schemaString: schema,
        dataRecord: person
    };

    kafka:ProducerError? result = producer->send(avroRecord, "add-person");
    if (result is kafka:ProducerError) {
        io:println(result);    
    }
}

Avro Consumer

The Kafka implementation of Ballerina currently supports Avro deserialization only for generic records. The Consumer will return kafka:AvroGenericRecord with the data received from Avro.

The following is a sample consumer.

src/producer/main.bal:

import ballerina/io;
import ballerina/kafka;

kafka:ConsumerConfiguration configs = {
    bootstrapServers: "<KAFKA_BROKER_HOST_AND_PORT>",
    groupId: "test-group",
    // Other configurations
    topics: ["add-person"],
    valueDeserializerType: kafka:DES_AVRO,
    schemaRegistryUrl: "<SCHEMA_REGISTRY_URL>"
};

listener kafka:Consumer consumer = new(configs);

service KafkaService on consumer {
    resource function onMessage(kafka:Consumer consumer, kafka:ConsumerRecord[] records) {
        io:println("Records received");
        foreach var kafkaRecord in records {
            anydata value = kafkaRecord.value;
            if (value is kafka:AvroGenericRecord) {
                io:println(value);
            } else {
                io:println("Invalid record type");
            }
        }
    }
}

Now, execute the below command to run the consumer:

ballerina run consumer

This will start the Kafka service to listen. You can verify it by the following messages, which will be displayed on the screen.

[ballerina/kafka] kafka servers: <KAFKA_BROKER_HOST_AND_PORT>
[ballerina/kafka] subscribed topics: add-person
[ballerina/kafka] started kafka listener

Then, open another terminal and execute the below command to run the producer:

ballerina run producer

Now, the consumer will receive the data and the received data will be printed on the Console as follows.

Records received
name=Lahiru Perera age=28

Records

AvroGenericRecord Represents a generic Avro record. This is the type of the value returned from an Avro deserializer consumer.
AvroRecord Defines a records to send data using Avro serialization.
ConsumerConfiguration Configuration related to consumer endpoint.
ConsumerRecord Type related to consumer record.
Detail Represents the details of an error.
KeyStore Record for providing key-store related configurations.
PartitionOffset This type represents topic partition position in which consumed record is stored.
ProducerConfiguration Struct which represents Kafka Producer configuration.
Protocols A record for configuring SSL/TLS protocol and version to be used.
SecureSocket Provides configurations for facilitating secure communication with the Kafka server.
TopicPartition This type represents a topic partition.
TrustStore Record for providing trust-store related configurations.

Objects

Deserializer

Represents a Kafka deserializer object. This object can be used to create custom deserializers for Ballerina Kafka consumers.

Serializer

Represents a Kafka serializer object. This object can be used to create custom serializers for Ballerina Kafka producers.

Clients

Consumer

Represent a Kafka consumer endpoint.

Producer

Represent a Kafka producer endpoint.

Constants

DES_BYTE_ARRAY

In-built Kafka byte array deserializer.

DES_STRING

In-built Kafka string deserializer.

DES_INT

In-built Kafka int deserializer.

DES_FLOAT

In-built Kafka float deserializer.

DES_CUSTOM

User-defined deserializer.

DES_AVRO

Apache Avro deserializer.

ISOLATION_COMMITTED

Consumer isolation level value 'read_committed'

ISOLATION_UNCOMMITTED

Consumer isolation level value 'read_uncommitted'

ACKS_ALL

Producer acknowledgement type 'all'. This will gurantee that the record will not be lost as long as at least one in-sync replica is alive.

ACKS_NONE

Producer acknowledgement type '0'. If the acknowledgement type set to this, the producer will not wait for any acknowledgement from the server.

ACKS_SINGLE

Producer acknowledgement type '1'. If the acknowledgement type set to this, the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers.

SER_BYTE_ARRAY

In-built Kafka Byte Array serializer.

SER_STRING

In-built Kafka string serializer.

SER_INT

In-built Kafka int serializer.

SER_FLOAT

In-built Kafka float serializer.

SER_CUSTOM

User-defined serializer.

SER_AVRO

Apache avro serializer.

COMPRESSION_NONE

Kafka compression type. Value 'none'

COMPRESSION_GZIP

Kafka compression type. Value 'gzip'

COMPRESSION_SNAPPY

Kafka compression type. Value 'snappy'

COMPRESSION_LZ4

Kafka compression type. Value 'lz4'

COMPRESSION_ZSTD

Kafka compression type. Value 'zstd'

CONSUMER_ERROR

Defines a Kafka consumer related error

PRODUCER_ERROR

Defines a Kafka producer related error

AVRO_ERROR

Defines an Avro serialization / deserialization error

Types

CompressionType

kafka compression type to compress the messages

DeserializerType

Kafka in-built deserializer type.

IsolationLevel

Kafka consumer isolation level type.

ProducerAcks

Kafka producer acknowledgement type.

SerializerType

Kafka in-built serializer type.

Errors

AvroError

Represents a Kafka Avro related error

ConsumerError

Represents a Kafka consumer related error

ProducerError

Represents a Kafka producer related error