Clients - kafka : Consumer

Represents a Kafka consumer endpoint.

Constructor

__init

(ConsumerConfiguration config)

Remote Methods

assign Assigns consumer to a set of topic partitions.
close Closes consumer connection to the external Kafka broker.
 kafka:ConsumerError? result = consumer->close();
commit Commits current consumed offsets for consumer.
 kafka:ConsumerError? result = consumer->commit();
commitOffset Commits given offsets and partitions for the given topics, for consumer.
connect Connects consumer to the provided host in the consumer configs.
 kafka:ConsumerError? result = consumer->connect();
getAssignment Retrieves the currently assigned partitions for the consumer.
 kafka:TopicPartition[]|kafka:ConsumerError result = consumer->getAssignment();
getAvailableTopics Retrieves the available list of topics for a particular consumer.
 string[]|kafka:ConsumerError result = consumer->getAvailableTopics();
getBeginningOffsets Retrieves the start offsets for given set of partitions.
getCommittedOffset Retrieves the last committed offsets for the given topic partitions.
getEndOffsets Retrieves the last offsets for given set of partitions.
getPausedPartitions Retrieves the partitions, which are currently paused.
 kafka:TopicPartition[]|kafka:ConsumerError result = consumer->getPausedPartitions();
getPositionOffset Retrieves the offset of the next record that will be fetched, if a records exists in that position.
getSubscription Retrieves the set of topics which are currently subscribed by the consumer.
 string[]|kafka:ConsumerError result = consumer->getSubscription();
getTopicPartitions Retrieves the set of partitions in which the topic belongs to.
 kafka:TopicPartition[]|kafka:ConsumerError result = consumer->getTopicPartitions("kafka-topic");
pause Pauses retrieving messages from a set of partitions.
poll Polls the consumer for external broker for records.
 kafka:ConsumerRecord[]|kafka:ConsumerError result = consumer->poll(1000);
resume Resumes consumer retrieving messages from set of partitions which were paused earlier.
seek Seeks for a given offset in a topic partition.
seekToBeginning Seeks the beginning of the offsets for the given set of topic partitions.
seekToEnd Seeks end of the offsets for the given set of topic partitions.
subscribe Subscribes the consumer to the provided set of topics.
 kafka:ConsumerError? result = consumer->subscribe(["kafka-topic-1", "kafka-topic-2"]);
subscribeToPattern Subscribes the consumer to the topics which matches to the provided pattern.
 kafka:ConsumerError? result = consumer->subscribeToPattern("kafka.*");
subscribeWithPartitionRebalance Subscribes to the provided set of topics with rebalance listening enabled. This function can be used inside a service, to subscribe to a set of topics, while rebalancing the patition assignment of the consumers.
unsubscribe Unsubscribes from all the topic subscriptions.
 kafka:ConsumerError? result = consumer->unsubscribe();

Methods

Starts the registered services.
Stops the kafka listener.
Stops the kafka listener.
Gets called every time a service attaches itself to the listener.
Detaches a consumer service from the listener.

Fields

  • consumerConfig ConsumerConfiguration? (default ())
  • Used to store configurations related to a Kafka connection

assign

(TopicPartition[] partitions)

returns ConsumerError?
Assigns consumer to a set of topic partitions.

Parameters

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if an error is encountered or else nil

close

(int duration)

returns ConsumerError?
Closes consumer connection to the external Kafka broker.
 kafka:ConsumerError? result = consumer->close();

Parameters

  • duration int (default -1)
  • Timeout duration for the close operation execution

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if an error is encountered or else nil

commit

()

returns ConsumerError?
Commits current consumed offsets for consumer.
 kafka:ConsumerError? result = consumer->commit();
  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if an error is encountered or else nil

commitOffset

(PartitionOffset[] offsets, int duration)

returns ConsumerError?
Commits given offsets and partitions for the given topics, for consumer.

Parameters

  • duration int (default -1)
  • Timeout duration for the commit operation execution

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if an error is encountered or else nil

connect

()

returns ConsumerError?
Connects consumer to the provided host in the consumer configs.
 kafka:ConsumerError? result = consumer->connect();
  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if an error is encountered or else nil

getAssignment

()

returns TopicPartition[] | ConsumerError
Retrieves the currently assigned partitions for the consumer.
 kafka:TopicPartition[]|kafka:ConsumerError result = consumer->getAssignment();
  • Return Type

    (TopicPartition[] | ConsumerError)
  • Array of assigned partitions for the consumer if executes successfully or else kafka:ConsumerError

getAvailableTopics

(int duration)

returns string[] | ConsumerError
Retrieves the available list of topics for a particular consumer.
 string[]|kafka:ConsumerError result = consumer->getAvailableTopics();

Parameters

  • duration int (default -1)
  • Timeout duration for the get available topics execution

  • Return Type

    (string[] | ConsumerError)
  • Array of topics currently available (authorized) for the consumer to subscribe or else kafka:ConsumerError

getBeginningOffsets

(TopicPartition[] partitions, int duration)

returns PartitionOffset[] | ConsumerError
Retrieves the start offsets for given set of partitions.

Parameters

  • partitions TopicPartition[]
  • Array of topic partitions to get the starting offsets

  • duration int (default -1)
  • Timeout duration for the get beginning offsets execution

  • Return Type

    (PartitionOffset[] | ConsumerError)
  • Starting offsets for the given partitions if executes successfully or else kafka:ConsumerError

getCommittedOffset

(TopicPartition partition, int duration)

returns PartitionOffset | ConsumerError
Retrieves the last committed offsets for the given topic partitions.

Parameters

  • partition TopicPartition
  • The TopicPartition in which the committed offset is returned for consumer

  • duration int (default -1)
  • Timeout duration for the get committed offset operation to execute

  • Return Type

    (PartitionOffset | ConsumerError)
  • Committed offset for the consumer for the given partition if executes successfully or else kafka:ConsumerError

getEndOffsets

(TopicPartition[] partitions, int duration)

returns PartitionOffset[] | ConsumerError
Retrieves the last offsets for given set of partitions.

Parameters

  • partitions TopicPartition[]
  • Set of partitions to get the last offsets

  • duration int (default -1)
  • Timeout duration for the get end offsets operation to execute

getPausedPartitions

()

returns TopicPartition[] | ConsumerError
Retrieves the partitions, which are currently paused.
 kafka:TopicPartition[]|kafka:ConsumerError result = consumer->getPausedPartitions();
  • Return Type

    (TopicPartition[] | ConsumerError)
  • Set of partitions paused from message retrieval if executes successfully or else kafka:ConsumerError

getPositionOffset

(TopicPartition partition, int duration)

returns int | ConsumerError
Retrieves the offset of the next record that will be fetched, if a records exists in that position.

Parameters

  • partition TopicPartition
  • The TopicPartition in which the position is required

  • duration int (default -1)
  • Timeout duration for the get position offset operation to execute

  • Return Type

    (int | ConsumerError)
  • Offset which will be fetched next (if a records exists in that offset) or else kafka:ConsumerError if the operation fails

getSubscription

()

returns string[] | ConsumerError
Retrieves the set of topics which are currently subscribed by the consumer.
 string[]|kafka:ConsumerError result = consumer->getSubscription();
  • Return Type

    (string[] | ConsumerError)
  • Array of subscribed topics for the consumer if executes successfully or else kafka:ConsumerError

getTopicPartitions

(string topic, int duration)

returns TopicPartition[] | ConsumerError
Retrieves the set of partitions in which the topic belongs to.
 kafka:TopicPartition[]|kafka:ConsumerError result = consumer->getTopicPartitions("kafka-topic");

Parameters

  • topic string
  • The topic for which the partition information is needed

  • duration int (default -1)
  • Timeout duration for the get topic partitions operation to execute

  • Return Type

    (TopicPartition[] | ConsumerError)
  • Array of partitions for the given topic if executes successfully or else kafka:ConsumerError

pause

(TopicPartition[] partitions)

returns ConsumerError?
Pauses retrieving messages from a set of partitions.

Parameters

  • partitions TopicPartition[]
  • Partitions to pause the retrieval of messages

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if an error is encountered or else nil

poll

(int timeoutValue)

returns ConsumerRecord[] | ConsumerError
Polls the consumer for external broker for records.
 kafka:ConsumerRecord[]|kafka:ConsumerError result = consumer->poll(1000);

Parameters

  • timeoutValue int
  • Polling time in milliseconds

resume

(TopicPartition[] partitions)

returns ConsumerError?
Resumes consumer retrieving messages from set of partitions which were paused earlier.

Parameters

  • partitions TopicPartition[]
  • Partitions to resume the retrieval of messages

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if an error is encountered or else ()

seek

(PartitionOffset offset)

returns ConsumerError?
Seeks for a given offset in a topic partition.

Parameters

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if an error is encountered or else ()

seekToBeginning

(TopicPartition[] partitions)

returns ConsumerError?
Seeks the beginning of the offsets for the given set of topic partitions.

Parameters

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if an error is encountered or else ()

seekToEnd

(TopicPartition[] partitions)

returns ConsumerError?
Seeks end of the offsets for the given set of topic partitions.

Parameters

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if an error is encountered or else ()

subscribe

(string[] topics)

returns ConsumerError?
Subscribes the consumer to the provided set of topics.
 kafka:ConsumerError? result = consumer->subscribe(["kafka-topic-1", "kafka-topic-2"]);

Parameters

  • topics string[]
  • Array of topics to be subscribed to

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if an error is encountered or else ()

subscribeToPattern

(string regex)

returns ConsumerError?
Subscribes the consumer to the topics which matches to the provided pattern.
 kafka:ConsumerError? result = consumer->subscribeToPattern("kafka.*");

Parameters

  • regex string
  • Pattern which should be matched with the topics to be subscribed to

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if an error is encountered or else ()

subscribeWithPartitionRebalance

(string[] topics, function(Consumer, TopicPartition[]) returns (()) onPartitionsRevoked, function(Consumer, TopicPartition[]) returns (()) onPartitionsAssigned)

returns ConsumerError?
Subscribes to the provided set of topics with rebalance listening enabled. This function can be used inside a service, to subscribe to a set of topics, while rebalancing the patition assignment of the consumers.

Parameters

  • topics string[]
  • Array of topics to be subscribed to

  • onPartitionsRevoked function(Consumer, TopicPartition[]) returns (())
  • Function which will be executed if partitions are revoked from this consumer

  • onPartitionsAssigned function(Consumer, TopicPartition[]) returns (())
  • Function which will be executed if partitions are assigned this consumer

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if an error is encountered or else ()

unsubscribe

()

returns ConsumerError?
Unsubscribes from all the topic subscriptions.
 kafka:ConsumerError? result = consumer->unsubscribe();
  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if an error is encountered or else ()

__start

()

returns error?
Starts the registered services.
  • Return Type

    (error?)
  • An kafka:ConsumerError if an error is encountered while starting the server or else nil

__gracefulStop

()

returns error?
Stops the kafka listener.
  • Return Type

    (error?)
  • An kafka:ConsumerError if an error is encountered during the listener stopping process or else nil

__immediateStop

()

returns error?
Stops the kafka listener.
  • Return Type

    (error?)
  • An kafka:ConsumerError if an error is encountered during the listener stopping process or else nil

__attach

(service s, string? name)

returns error?
Gets called every time a service attaches itself to the listener.

Parameters

  • s service
  • The service to be attached

  • name string? (default <string?> ())
  • Name of the service

  • Return Type

    (error?)
  • An kafka:ConsumerError if an error is encountered while attaching the service or else nil

__detach

(service s)

returns error?
Detaches a consumer service from the listener.

Parameters

  • s service
  • The service to be detached

  • Return Type

    (error?)
  • An kafka:ConsumerError if an error is encountered while detaching a service or else nil