Module : kafka

Package overview

Ballerina Kafka Connector is used to connect Ballerina with Kafka Brokers. With the Kafka Connector, Ballerina can act as Kafka Consumers and Kafka Producers. This connector supports kafka 1.x.x and 2.0.0 versions.

Samples

Simple Kafka Consumer

Following is a simple service which is subscribed to a topic 'test-kafka-topic' on remote Kafka broker cluster.

import ballerina/kafka;
import ballerina/encoding;
import ballerina/io;

kafka:ConsumerConfig consumerConfigs = {
    bootstrapServers:"localhost:9092",
    groupId:"group-id",
    topics:["test-kafka-topic"],
    pollingInterval:1000
};

listener kafka:Consumer consumer = new(consumerConfigs);

service kafkaService on consumer {

    resource function onMessage(kafka:ConsumerAction consumerAction, kafka:ConsumerRecord[] records) {
        // Dispatched set of Kafka records to service, We process each one by one.
        foreach var kafkaRecord in records {
            processKafkaRecord(kafkaRecord);
        }
    }
}

function processKafkaRecord(kafka:ConsumerRecord kafkaRecord) {
    byte[] serializedMsg = kafkaRecord.value;
    string msg = encoding:byteArrayToString(serializedMsg);
    // Print the retrieved Kafka record.
    io:println("Topic: " + kafkaRecord.topic + " Received Message: " + msg);
}

Please find the consumer parameters below:

| Parameter | Description | | :--- | :- | | bootstrapServers | The list of host and port pairs, which are the addresses of the Kafka brokers in a "bootstrap" Kafka cluster. | | groupId | The Identifier of the consumer group. | | topics | The topics that must be listened by this consumer. | | pollingIntervalInMillis | The time interval that a consumer polls the topic. | | offsetReset | Offset reset strategy if no initial offset. | | partitionAssignmentStrategy | Strategy class for handling the partition assignment among consumers. | | metricsRecordingLevel | Metrics recording level. | | metricsReporterClasses | Metrics reporter classes. | | clientId | Identifier to be used for server side logging. | | interceptorClasses | Interceptor classes to be used before sending records. | | isolationLevel | Transactional message reading method. Use "read_committed" to read committed messages only in transactional mode when poll() is called. Use "read_uncommitted" to read all the messages, even the aborted ones. | | properties | Additional properties if required. | | sessionTimeoutInMillis | Timeout used to detect consumer failures when heartbeat threshold is reached. | | heartBeatIntervalInMillis | Expected time between heartbeats. | | metadataMaxAgeInMillis | Maximum time to force a refresh of metadata. | | autoCommitIntervalInMillis | Auto committing interval for commit offset, when auto-commit is enabled. | | maxPartitionFetchBytes | The maximum amount of data per-partition the server returns. | | sendBuffer | Size of the TCP send buffer (SO_SNDBUF). | | receiveBuffer | Size of the TCP receive buffer (SO_RCVBUF). | | fetchMinBytes | Minimum amount of data the server should return for a fetch request. | | fetchMaxBytes | Maximum amount of data the server should return for a fetch request. | | fetchMaxWaitTimeInMillis | Maximum amount of time the server will block before answering the fetch request. | | reconnectBackoffTimeMaxInMillis | Maximum amount of time in milliseconds to wait when reconnecting. | | retryBackoffInMillis | Time to wait before attempting to retry a failed request. | | metricsSampleWindowInMillis | Window of time a metrics sample is computed over. | | metricsNumSamples | Number of samples maintained to compute metrics. | | requestTimeoutInMillis | Wait time for response of a request. | | connectionMaxIdleTimeInMillis | Close idle connections after the number of milliseconds. | | maxPollRecords | Maximum number of records returned in a single call to poll. | | maxPollInterval | Maximum delay between invocations of poll. | | reconnectBackoffTimeInMillis | Time to wait before attempting to reconnect. | | pollingTimeoutInMillis | Timeout interval for polling. | | concurrentConsumers | Number of concurrent consumers. | | defaultApiTimeoutInMillis | Default API timeout value for APIs with duration. | | autoCommit | Enables auto committing offsets. | | checkCRCS | Check the CRC32 of the records consumed. | | excludeInternalTopics | Whether records from internal topics should be exposed to the consumer. | | decoupleProcessing | Decouples processing. |

Kafka Producer

Following is a simple program which publishes a message to 'test-kafka-topic' topic in a remote Kafka broker cluster.

import ballerina/kafka;

kafka:ProducerConfig producerConfigs = {
    // Here we create a producer configs with optional parameters client.id - used for broker side logging.
    // acks - number of acknowledgments for request complete,
    // noRetries - number of retries if record send fails.
    bootstrapServers: "localhost:9092",
    clientId:"basic-producer",
    acks:"all",
    noRetries:3
};

kafka:Producer kafkaProducer = new(producerConfigs);

function main () {
    string msg = "Hello World, Ballerina";
    byte[] serializedMsg = msg.toByteArray("UTF-8");
    var sendResult = kafkaProducer->send(serializedMsg, "test-kafka-topic");
    if (sendResult is error) {
        log:printError("Kafka producer failed to send data", err = sendResult);
    }
}

Please find the producer configuration parameters below:

| Parameter | Description | | :--- | :- | | bootstrapServers | List of remote server endpoints of Kafka brokers. | | acks | Number of acknowledgments. | | compressionType | Compression type to be used for messages. | | clientId | Identifier to be used for server side logging. | | metricsRecordingLevel | Metrics recording level. | | metricReporterClasses | Metrics reporter classes. | | partitionerClass | Partitioner class to be used to select partition to which the message is sent. | | interceptorClasses | Interceptor classes to be used before sending records. | | transactionalId | Transactional ID to be used in transactional delivery. | | bufferMemory | Total bytes of memory the producer can use to buffer records. | | noRetries | Number of retries to resend a record. | | batchSize | Number of records to be batched for a single request. Use 0 for no batching. | | linger | Delay to allow other records to be batched. | | sendBuffer | Size of the TCP send buffer (SO_SNDBUF). | | receiveBuffer | Size of the TCP receive buffer (SO_RCVBUF). | | maxRequestSize | The maximum size of a request in bytes. | | reconnectBackoffTimeInMillis | Time to wait before attempting to reconnect. | | reconnectBackoffMaxTimeInMillis | Maximum amount of time in milliseconds to wait when reconnecting. | | retryBackoffTimeInMillis | Time to wait before attempting to retry a failed request. | | maxBlock | Maximum block time which the send is blocked, when the buffer is full. | | requestTimeoutInMillis | Wait time for response of a request. | | metadataMaxAgeInMillis | Maximum time to force a refresh of metadata. | | metricsSampleWindowInMillis | Time window for a metrics sample to computed over. | | metricsNumSamples | Number of samples maintained to compute metrics. | | maxInFlightRequestsPerConnection | Maximum number of unacknowledged requests on a single connection. | | connectionsMaxIdleTimeInMillis | Close idle connections after the number of milliseconds. | | transactionTimeoutInMillis | Timeout for transaction status update from the producer. | | enableIdempotence | Exactly one copy of each message is written in the stream when enabled. |

Records

ConsumerConfig Configuration related to consumer endpoint.
ConsumerRecord Type related to consumer record.
Detail
KeyStore Record for providing key-store related configurations.
PartitionOffset This type represents topic partition position in which consumed record is stored.
ProducerConfig Struct which represents Kafka Producer configuration.
Protocols A record for configuring SSL/TLS protocol and version to be used.
SecureSocket Provides configurations for facilitating secure communication with the Kafka server.
TopicPartition This type represents a topic partition.
TrustStore Record for providing trust-store related configurations.

Clients

Consumer

Represent a Kafka consumer endpoint.

Producer

Represent a Kafka producer endpoint.

Constants

CONSUMER_ERROR
PRODUCER_ERROR
ACKS_ALL

Producer acknowledgement type all. This will gurantee that the record will not be lost as long as at least one in-sync replica is alive.

ACKS_NONE

Producer acknowledgement type 0. If the acknowledgement type set to this, the producer will not wait for any acknowledgement from the server.

ACKS_SINGLE

Producer acknowledgement type 1. If the acknowledgement type set to this, the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers.

Types

Producer_Acks

Kafka producer acknowledgement type.

Errors

ConsumerError

Kafka Consumer related errors

ProducerError

Kafka Producer related errors