Module : kafka

Module Overview

This module is used to interact with Kafka Brokers via Kafka Consumer and Kafka Producer clients. This module supports Kafka 1.x.x and 2.0.0 versions.

For information on the operations, which you can perform with this module, see the below Functions. For examples on the usage of the operations, see the following.

Basic Usages

Publishing Messages

  1. Initialize the Kafka message producer.
kafka:ProducerConfiguration producerConfiguration = {
    bootstrapServers: "localhost:9092",
    clientId: "basic-producer",
    acks: "all",
    retryCount: 3,
    valueSerializerType: kafka:SER_STRING,
    keySerializerType: kafka:SER_INT
};

kafka:Producer kafkaProducer = new (producerConfiguration);
  1. Use the kafka:Producer to publish messages.
string message = "Hello World, Ballerina";
kafka:ProducerError? result = kafkaProducer->send(message, "kafka-topic", key = 1);

Consuming Messages

  1. Initializing the Kafka message consumer.
kafka:ConsumerConfiguration consumerConfiguration = {
    bootstrapServers: "localhost:9092",
    groupId: "group-id",
    offsetReset: "earliest",
    topics: ["kafka-topic"]
};

kafka:Consumer consumer = new (consumerConfiguration);
  1. Use the kafka:Consumer as a simple record consumer.
kafka:ConsumerRecord[]|kafka:ConsumerError result = consumer->poll(1000);
  1. Use the kafka:Consumer as a listener.
listener kafka:Consumer consumer = new (consumerConfiguration);

service kafkaService on consumer {
    // This resource will be executed when a message is published to the
    // subscribed topic/topics.
    resource function onMessage(kafka:Consumer kafkaConsumer,
            kafka:ConsumerRecord[] records) {
    }
}

Send Data Using Avro

The Ballerina Kafka module supports Avro serialization and deserialization.

To try this, create a new Ballerina project and two modules inside it.

Execute the below commands to do this.

ballerina new kafka_avro_sample
cd kafka_avro_sample
ballerina add producer
ballerina add consumer

Dependencies

To use Avro, you need to add the necessary dependencies to the Ballerina project you created. To do so, download the necessary dependencies and add them inside the resources directory. Also, add those dependencies to the Ballerina.toml file of your project. The following is a sample Ballerina.toml file with the dependencies.

    [[platform.libraries]]
    module = "producer"
    path = "./resources/kafka-avro-serializer-5.4.1.jar"
    artifactId = "kafka-avro-serializer"
    version = "5.4.1"
    groupId = "io.confluent"

    [[platform.libraries]]
    module = "producer"
    path = "./resources/kafka-schema-registry-client-5.4.1.jar"
    artifactId = "kafka-schema-registry-client"
    version = "5.4.1"
    groupId = "io.confluent"

    [[platform.libraries]]
    module = "producer"
    path = "./resources/common-config-5.4.1.jar"
    artifactId = "common-config"
    version = "5.4.1"
    groupId = "io.confluent"

    [[platform.libraries]]
    module = "producer"
    path = "./resources/common-utils-5.4.1.jar"
    artifactId = "common-utils"
    version = "5.4.1"
    groupId = "io.confluent"

    [[platform.libraries]]
    module = "producer"
    path = "./resources/avro-1.9.2.jar"
    artifactId = "avro"
    version = "1.9.2"
    groupId = "org.apache.avro"

    [[platform.libraries]]
    module = "producer"
    path = "./resources/jackson-core-2.10.3.jar"
    artifactId = "jackson-core"
    version = "1.9.2"
    groupId = "com.fasterxml.jackson.core"

    [[platform.libraries]]
    module = "producer"
    path = "./resources/jackson-databind-2.10.3.jar"
    artifactId = "jackson-databind"
    version = "2.10.3"
    groupId = "com.fasterxml.jackson.core"

    [[platform.libraries]]
    module = "producer"
    path = "./resources/jackson-annotations-2.10.3.jar"
    artifactId = "jackson-annotations"
    version = "2.10.3"
    groupId = "com.fasterxml.jackson.core"

Now, the directory structure will look like below (some of the files are ignored).

├── Ballerina.toml
├── resources
│   ├── avro-1.9.2.jar
│   ├── common-config-5.4.1.jar
│   ├── common-utils-5.4.1.jar
│   ├── jackson-annotations-2.10.3.jar
│   ├── jackson-core-2.10.3.jar
│   ├── jackson-databind-2.10.3.jar
│   ├── kafka-avro-serializer-5.4.1.jar
│   └── kafka-schema-registry-client-5.4.1.jar
└── src
    ├── consumer
    │   └── main.bal
    └── producer
        └── main.bal

Avro Producer

The kafka:Proucer can be configured to send data using Avro by providing the following configurations.

src/producer/main.bal:

import ballerina/io;
import ballerina/kafka;

public type Person record {
    string name;
    int age;
};

kafka:ProducerConfiguration configs = {
    bootstrapServers: "<KAFKA_BROKER_HOST_AND_PORT>",
    // Other configurations
    valueSerializerType: kafka:SER_AVRO,
    schemaRegistryUrl: "<SCHEMA_REGISTRY_URL>"
};

string schema = "{\"type\" : \"record\"," +
                  "\"namespace\" : \"Thisaru\"," +
                  "\"name\" : \"person\"," +
                  "\"fields\" : [" + 
                    "{ \"name\" : \"name\", \"type\" : \"string\" }," +
                    "{ \"name\" : \"age\", \"type\" : \"int\" }" +
                  "]}";

public function main() {
    kafka:Producer producer = new(configs);

    Person person = {
        name: "Lahiru Perera",
        age: 28
    };

    kafka:AvroRecord avroRecord = {
        schemaString: schema,
        dataRecord: person
    };

    kafka:ProducerError? result = producer->send(avroRecord, "add-person");
    if (result is kafka:ProducerError) {
        io:println(result);    
    }
}

Avro Consumer

The Kafka implementation of Ballerina currently supports Avro deserialization only for generic records. The Consumer will return a kafka:AvroGenericRecord with the data received from Avro.

The following is a sample consumer.

src/producer/main.bal:

import ballerina/io;
import ballerina/kafka;

kafka:ConsumerConfiguration configs = {
    bootstrapServers: "<KAFKA_BROKER_HOST_AND_PORT>",
    groupId: "test-group",
    // Other configurations
    topics: ["add-person"],
    valueDeserializerType: kafka:DES_AVRO,
    schemaRegistryUrl: "<SCHEMA_REGISTRY_URL>"
};

listener kafka:Consumer consumer = new(configs);

service KafkaService on consumer {
    resource function onMessage(kafka:Consumer consumer, kafka:ConsumerRecord[] records) {
        io:println("Records received");
        foreach var kafkaRecord in records {
            anydata value = kafkaRecord.value;
            if (value is kafka:AvroGenericRecord) {
                io:println(value);
            } else {
                io:println("Invalid record type");
            }
        }
    }
}

Now, execute the below command to run the consumer:

ballerina run consumer

This will start the Kafka service to listen. You can verify it by the following messages, which will be displayed on the screen.

[ballerina/kafka] kafka servers: <KAFKA_BROKER_HOST_AND_PORT>
[ballerina/kafka] subscribed topics: add-person
[ballerina/kafka] started kafka listener

Then, open another terminal and execute the below command to run the producer:

ballerina run producer

Now, the consumer will receive the data and the received data will be printed on the Console as follows.

Records received
name=Lahiru Perera age=28

Records

AvroGenericRecord Represents a generic Avro record.
AvroRecord Defines a records to send data using Avro serialization.
ConsumerConfiguration Configurations related to consumer endpoint.
ConsumerRecord Type related to consumer record.
Detail Represents the details of an error.
KeyStore Configurations related to the KeyStore.
PartitionOffset Represents the topic partition position in which the consumed record is stored.
ProducerConfiguration Represents the Kafka Producer configuration.
Protocols Configurations related to the SSL/TLS protocol and the versions to be used.
SecureSocket Configurations for facilitating secure communication with the Kafka server.
TopicPartition Represents a topic partition.
TrustStore Configurations related to the TrustStore.

Objects

Deserializer Represents a Kafka deserializer object.
Serializer Represents a Kafka serializer object.

Clients

Consumer Represents a Kafka consumer endpoint.
Producer Represents a Kafka producer endpoint.

Constants

DES_BYTE_ARRAY In-built Kafka byte array deserializer.
DES_STRING In-built Kafka string deserializer.
DES_INT In-built Kafka int deserializer.
DES_FLOAT In-built Kafka float deserializer.
DES_CUSTOM User-defined deserializer.
DES_AVRO Apache Avro deserializer.
ISOLATION_COMMITTED Configures the consumer to read the committed messages only in the transactional mode when poll() is called.
ISOLATION_UNCOMMITTED Configures the consumer to read all the messages even the aborted ones.
ACKS_ALL Producer acknowledgement type is 'all'.
ACKS_NONE Producer acknowledgement type '0'.
ACKS_SINGLE Producer acknowledgement type '1'.
SER_BYTE_ARRAY In-built Kafka Byte Array serializer.
SER_STRING In-built Kafka string serializer.
SER_INT In-built Kafka int serializer.
SER_FLOAT In-built Kafka float serializer.
SER_CUSTOM User-defined serializer.
SER_AVRO Apache Avro serializer.
COMPRESSION_NONE No compression.
COMPRESSION_GZIP Kafka GZIP compression type.
COMPRESSION_SNAPPY Kafka Snappy compression type.
COMPRESSION_LZ4 Kafka LZ4 compression type.
COMPRESSION_ZSTD Kafka ZSTD compression type.
CONSUMER_ERROR Used as the error reason for the kafka:ConsumerError type.
PRODUCER_ERROR Used as the error reason for the kafka:ProducerError type.
AVRO_ERROR Used as the error reason for the kafka:AvroError type.

Types

CompressionType Kafka compression types to compress the messages.
DeserializerType Kafka in-built deserializer type.
IsolationLevel Kafka consumer isolation level type.
ProducerAcks Kafka producer acknowledgement types.
SerializerType Kafka in-built serializer types.

Errors

AvroError Represents a Kafka Avro related error.
ConsumerError Error type specific to the kafka:Consumer object functions.
ProducerError Error type specific to the kafka:Producer object functions.