Clients - kafka : Consumer

Represent a Kafka consumer endpoint.

Constructor

__init

(ConsumerConfiguration config)

Remote Methods

assign

Assigns consumer to a set of topic partitions.

close

Closes consumer connection to the external Kafka broker.

commit

Commits current consumed offsets for consumer.

commitOffset

Commits given offsets and partitions for the given topics, for consumer.

connect

Connects consumer to the provided host in the consumer configs.

getAssignment

Returns the currently assigned partitions for the consumer.

getAvailableTopics

Returns the available list of topics for a particular consumer.

getBeginningOffsets

Returns start offsets for given set of partitions.

getCommittedOffset

Returns last committed offsets for the given topic partitions.

getEndOffsets

Returns last offsets for given set of partitions.

getPausedPartitions

Returns the partitions, which are currently paused.

getPositionOffset

Returns the offset of the next record that will be fetched, if a records exists in that position.

getSubscription

Returns set of topics which are currently subscribed by the consumer.

getTopicPartitions

Retrieve the set of partitions in which the topic belongs.

pause

Pause consumer retrieving messages from set of partitions.

poll

Poll the consumer for external broker for records.

resume

Resume consumer retrieving messages from set of partitions which were paused earlier.

seek

Seek for a given offset in a topic partition.

seekToBeginning

Seek the beginning of the offsets for the given set of topic partitions.

seekToEnd

Seek end of the offsets for the given set of topic partitions.

subscribe

Subscribes the consumer to the provided set of topics.

subscribeToPattern

Subscribes the consumer to the topics which matches to the provided pattern.

subscribeWithPartitionRebalance

Subscribes to consumer to the provided set of topics with rebalance listening is enabled. This function can be used inside a service, to subscribe to a set of topics, while rebalancing the patition assignment of the consumers.

unsubscribe

Unsubscribe the consumer from all the topic subscriptions.

Methods

Fields

  • consumerConfig ConsumerConfiguration? (default ())
  • Used to store configurations related to a Kafka connection.

assign

(TopicPartition[] partitions)

returns ConsumerError?

Assigns consumer to a set of topic partitions.

Parameters

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if encounters an error, returns nil otherwise.

close

(int duration)

returns ConsumerError?

Closes consumer connection to the external Kafka broker.

Parameters

  • duration int (default -1)
  • Timeout duration for the close operation execution.

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if encounters an error, returns nil otherwise.

commit

()

returns ConsumerError?

Commits current consumed offsets for consumer.

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if encounters an error, returns nil otherwise.

commitOffset

(PartitionOffset[] offsets, int duration)

returns ConsumerError?

Commits given offsets and partitions for the given topics, for consumer.

Parameters

  • duration int (default -1)
  • Timeout duration for the commit operation execution.

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if encounters an error, returns nil otherwise.

connect

()

returns ConsumerError?

Connects consumer to the provided host in the consumer configs.

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if encounters an error, returns nil otherwise.

getAssignment

()

returns TopicPartition[] | ConsumerError

Returns the currently assigned partitions for the consumer.

  • Return Type

    (TopicPartition[] | ConsumerError)
  • Array of assigned partitions for the consumer if executes successfully, kafka:ConsumerError otherwise.

getAvailableTopics

(int duration)

returns string[] | ConsumerError

Returns the available list of topics for a particular consumer.

Parameters

  • duration int (default -1)
  • Timeout duration for the get available topics execution.

  • Return Type

    (string[] | ConsumerError)
  • Array of topics currently available (authorized) for the consumer to subscribe, returns kafka:ConsumerError if the operation fails.

getBeginningOffsets

(TopicPartition[] partitions, int duration)

returns PartitionOffset[] | ConsumerError

Returns start offsets for given set of partitions.

Parameters

  • partitions TopicPartition[]
  • Array of topic partitions to get the starting offsets.

  • duration int (default -1)
  • Timeout duration for the get beginning offsets execution.

  • Return Type

    (PartitionOffset[] | ConsumerError)
  • Starting offsets for the given partitions if executes successfully, returns kafka:ConsumerError if the operation fails.

getCommittedOffset

(TopicPartition partition, int duration)

returns PartitionOffset | ConsumerError

Returns last committed offsets for the given topic partitions.

Parameters

  • partition TopicPartition
  • Topic partition in which the committed offset is returned for consumer.

  • duration int (default -1)
  • Timeout duration for the get committed offset operation to execute.

  • Return Type

    (PartitionOffset | ConsumerError)
  • Committed offset for the consumer for the given partition if executes successfully, returns kafka:ConsumerError if the operation fails.

getEndOffsets

(TopicPartition[] partitions, int duration)

returns PartitionOffset[] | ConsumerError

Returns last offsets for given set of partitions.

Parameters

  • partitions TopicPartition[]
  • Set of partitions to get the last offsets.

  • duration int (default -1)
  • Timeout duration for the get end offsets operation to execute.

  • Return Type

    (PartitionOffset[] | ConsumerError)
  • End offsets for the given partitions if executes successfully, returns kafka:ConsumerError if the operation fails.

getPausedPartitions

()

returns TopicPartition[] | ConsumerError

Returns the partitions, which are currently paused.

  • Return Type

    (TopicPartition[] | ConsumerError)
  • Set of partitions paused from message retrieval if executes successfully, returns kafka:ConsumerError if the operation fails.

getPositionOffset

(TopicPartition partition, int duration)

returns int | ConsumerError

Returns the offset of the next record that will be fetched, if a records exists in that position.

Parameters

  • partition TopicPartition
  • Topic partition in which the position is required.

  • duration int (default -1)
  • Timeout duration for the get position offset operation to execute.

  • Return Type

    (int | ConsumerError)
  • Offset which will be fetched next (if a records exists in that offset), returns kafka:ConsumerError if the operation fails.

getSubscription

()

returns string[] | ConsumerError

Returns set of topics which are currently subscribed by the consumer.

  • Return Type

    (string[] | ConsumerError)
  • Array of subscribed topics for the consumer if executes successfully, returns kafka:ConsumerError if the operation fails.

getTopicPartitions

(string topic, int duration)

returns TopicPartition[] | ConsumerError

Retrieve the set of partitions in which the topic belongs.

Parameters

  • topic string
  • Given topic for partition information is needed.

  • duration int (default -1)
  • Timeout duration for the get topic partitions operation to execute.

  • Return Type

    (TopicPartition[] | ConsumerError)
  • Array of partitions for the given topic if executes successfully, returns kafka:ConsumerError if the operation fails.

pause

(TopicPartition[] partitions)

returns ConsumerError?

Pause consumer retrieving messages from set of partitions.

Parameters

  • partitions TopicPartition[]
  • Set of partitions to pause the retrieval of messages.

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if encounters an error, returns nil otherwise.

poll

(int timeoutValue)

returns ConsumerRecord[] | ConsumerError

Poll the consumer for external broker for records.

Parameters

  • timeoutValue int
  • Polling time in milliseconds.

  • Return Type

    (ConsumerRecord[] | ConsumerError)
  • Array of consumer records if executes successfully, returns kafka:ConsumerError if the operation fails.

resume

(TopicPartition[] partitions)

returns ConsumerError?

Resume consumer retrieving messages from set of partitions which were paused earlier.

Parameters

  • partitions TopicPartition[]
  • Set of partitions to resume the retrieval of messages.

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if encounters an error, returns nil otherwise.

seek

(PartitionOffset offset)

returns ConsumerError?

Seek for a given offset in a topic partition.

Parameters

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if encounters an error, returns nil otherwise.

seekToBeginning

(TopicPartition[] partitions)

returns ConsumerError?

Seek the beginning of the offsets for the given set of topic partitions.

Parameters

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if encounters an error, returns nil otherwise.

seekToEnd

(TopicPartition[] partitions)

returns ConsumerError?

Seek end of the offsets for the given set of topic partitions.

Parameters

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if encounters an error, returns nil otherwise.

subscribe

(string[] topics)

returns ConsumerError?

Subscribes the consumer to the provided set of topics.

Parameters

  • topics string[]
  • Array of topics to be subscribed.

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if encounters an error, returns nil otherwise.

subscribeToPattern

(string regex)

returns ConsumerError?

Subscribes the consumer to the topics which matches to the provided pattern.

Parameters

  • regex string
  • Pattern which should be matched with the topics to be subscribed.

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if encounters an error, returns nil otherwise.

subscribeWithPartitionRebalance

(string[] topics, function(Consumer, TopicPartition[]) returns (()) onPartitionsRevoked, function(Consumer, TopicPartition[]) returns (()) onPartitionsAssigned)

returns ConsumerError?

Subscribes to consumer to the provided set of topics with rebalance listening is enabled. This function can be used inside a service, to subscribe to a set of topics, while rebalancing the patition assignment of the consumers.

Parameters

  • topics string[]
  • Array of topics to be subscribed.

  • onPartitionsRevoked function(Consumer, TopicPartition[]) returns (())
  • Function which will be executed if partitions are revoked from this consumer.

  • onPartitionsAssigned function(Consumer, TopicPartition[]) returns (())
  • Function which will be executed if partitions are assigned this consumer.

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if encounters an error, returns nil otherwise.

unsubscribe

()

returns ConsumerError?

Unsubscribe the consumer from all the topic subscriptions.

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if encounters an error, returns nil otherwise.

__start

()

returns error?

Starts the registered service.

  • Return Type

    (error?)
  • An error if encounters an error while starting the server, returns nil otherwise.

__gracefulStop

()

returns error?

Stops the kafka listener.

  • Return Type

    (error?)
  • An error if an error occurred during the listener stopping process

__immediateStop

()

returns error?

Stops the kafka listener.

  • Return Type

    (error?)
  • An error if an error occurred during the listener stopping process

__attach

(service s, string? name)

returns error?

Gets called every time a service attaches itself to this listener.

Parameters

  • s service
  • The type of the service to be registered.

  • name string? (default ())
  • Name of the service.

  • Return Type

    (error?)
  • An error if encounters an error while attaching the service, returns nil otherwise.

__detach

(service s)

returns error?

Detaches a consumer service from the listener.

Parameters

  • s service
  • The service to be detached

  • Return Type

    (error?)
  • An error if an error occurred during detaching a service or nil