Clients - kafka : Consumer

Represents a Kafka consumer endpoint.

Constructor

__init

(ConsumerConfiguration config)

Remote Methods

assign

Assigns consumer to a set of topic partitions.

close

Closes the consumer connection of the external Kafka broker.

 kafka:ConsumerError? result = consumer->close();
commit

Commits the current consumed offsets for the consumer.

 kafka:ConsumerError? result = consumer->commit();
commitOffset

Commits given offsets and partitions for the given topics, for consumer.

connect

Connects the consumer to the provided host in the consumer configs.

 kafka:ConsumerError? result = consumer->connect();
getAssignment

Retrieves the currently-assigned partitions for the consumer.

 kafka:TopicPartition[]|kafka:ConsumerError result = consumer->getAssignment();
getAvailableTopics

Retrieves the available list of topics for a particular consumer.

 string[]|kafka:ConsumerError result = consumer->getAvailableTopics();
getBeginningOffsets

Retrieves the start offsets for given set of partitions.

getCommittedOffset

Retrieves the last committed offsets for the given topic partitions.

getEndOffsets

Retrieves the last offsets for given set of partitions.

getPausedPartitions

Retrieves the partitions, which are currently paused.

 kafka:TopicPartition[]|kafka:ConsumerError result = consumer->getPausedPartitions();
getPositionOffset

Retrieves the offset of the next record that will be fetched, if a records exists in that position.

getSubscription

Retrieves the set of topics, which are currently subscribed by the consumer.

 string[]|kafka:ConsumerError result = consumer->getSubscription();
getTopicPartitions

Retrieves the set of partitions to which the topic belongs.

 kafka:TopicPartition[]|kafka:ConsumerError result = consumer->getTopicPartitions("kafka-topic");
pause

Pauses retrieving messages from a set of partitions.

poll

Polls the consumer for the records of an external broker.

 kafka:ConsumerRecord[]|kafka:ConsumerError result = consumer->poll(1000);
resume

Resumes consumer retrieving messages from set of partitions which were paused earlier.

seek

Seeks for a given offset in a topic partition.

seekToBeginning

Seeks the beginning of the offsets for the given set of topic partitions.

seekToEnd

Seeks end of the offsets for the given set of topic partitions.

subscribe

Subscribes the consumer to the provided set of topics.

 kafka:ConsumerError? result = consumer->subscribe(["kafka-topic-1", "kafka-topic-2"]);
subscribeToPattern

Subscribes the consumer to the topics, which match the provided pattern.

 kafka:ConsumerError? result = consumer->subscribeToPattern("kafka.*");
subscribeWithPartitionRebalance

Subscribes to the provided set of topics with rebalance listening enabled. This function can be used inside a service, to subscribe to a set of topics, while rebalancing the patition assignment of the consumers.

unsubscribe

Unsubscribes from all the topic subscriptions.

 kafka:ConsumerError? result = consumer->unsubscribe();

Methods

Starts the registered services.

Stops the kafka listener.

Stops the kafka listener.

Gets called every time a service attaches itself to the listener.

Detaches a consumer service from the listener.

Fields

assign

(TopicPartition[] partitions)

returns ConsumerError?

Assigns consumer to a set of topic partitions.

Parameters

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if an error is encountered or else nil

close

(int duration)

returns ConsumerError?

Closes the consumer connection of the external Kafka broker.

 kafka:ConsumerError? result = consumer->close();

Parameters

  • duration int (default -1)
  • Timeout duration for the close operation execution

  • Return Type

    (ConsumerError?)
  • A kafka:ConsumerError if an error is encountered or else '()'

commit

()

returns ConsumerError?

Commits the current consumed offsets for the consumer.

 kafka:ConsumerError? result = consumer->commit();
  • Return Type

    (ConsumerError?)
  • A kafka:ConsumerError if an error is encountered or else '()'

commitOffset

(PartitionOffset[] offsets, int duration)

returns ConsumerError?

Commits given offsets and partitions for the given topics, for consumer.

Parameters

  • duration int (default -1)
  • Timeout duration for the commit operation execution

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if an error is encountered or else nil

connect

()

returns ConsumerError?

Connects the consumer to the provided host in the consumer configs.

 kafka:ConsumerError? result = consumer->connect();
  • Return Type

    (ConsumerError?)
  • A kafka:ConsumerError if an error is encountered or else '()'

getAssignment

()

returns TopicPartition[] | ConsumerError

Retrieves the currently-assigned partitions for the consumer.

 kafka:TopicPartition[]|kafka:ConsumerError result = consumer->getAssignment();
  • Return Type

    (TopicPartition[] | ConsumerError)
  • Array of assigned partitions for the consumer if executes successfully or else a kafka:ConsumerError

getAvailableTopics

(int duration)

returns string[] | ConsumerError

Retrieves the available list of topics for a particular consumer.

 string[]|kafka:ConsumerError result = consumer->getAvailableTopics();

Parameters

  • duration int (default -1)
  • Timeout duration for the execution of the get available topics operation

  • Return Type

    (string[] | ConsumerError)
  • Array of topics currently available (authorized) for the consumer to subscribe or else a kafka:ConsumerError

getBeginningOffsets

(TopicPartition[] partitions, int duration)

returns PartitionOffset[] | ConsumerError

Retrieves the start offsets for given set of partitions.

Parameters

  • partitions TopicPartition[]
  • Array of topic partitions to get the starting offsets

  • duration int (default -1)
  • Timeout duration for the get beginning offsets execution

  • Return Type

    (PartitionOffset[] | ConsumerError)
  • Starting offsets for the given partitions if executes successfully or else kafka:ConsumerError

getCommittedOffset

(TopicPartition partition, int duration)

returns PartitionOffset | ConsumerError

Retrieves the last committed offsets for the given topic partitions.

Parameters

  • partition TopicPartition
  • The TopicPartition in which the committed offset is returned for consumer

  • duration int (default -1)
  • Timeout duration for the get committed offset operation to execute

  • Return Type

    (PartitionOffset | ConsumerError)
  • Committed offset for the consumer for the given partition if executes successfully or else kafka:ConsumerError

getEndOffsets

(TopicPartition[] partitions, int duration)

returns PartitionOffset[] | ConsumerError

Retrieves the last offsets for given set of partitions.

Parameters

  • partitions TopicPartition[]
  • Set of partitions to get the last offsets

  • duration int (default -1)
  • Timeout duration for the get end offsets operation to execute

getPausedPartitions

()

returns TopicPartition[] | ConsumerError

Retrieves the partitions, which are currently paused.

 kafka:TopicPartition[]|kafka:ConsumerError result = consumer->getPausedPartitions();
  • Return Type

    (TopicPartition[] | ConsumerError)
  • Set of partitions paused from message retrieval if executes successfully or else a kafka:ConsumerError

getPositionOffset

(TopicPartition partition, int duration)

returns int | ConsumerError

Retrieves the offset of the next record that will be fetched, if a records exists in that position.

Parameters

  • partition TopicPartition
  • The TopicPartition in which the position is required

  • duration int (default -1)
  • Timeout duration for the get position offset operation to execute

  • Return Type

    (int | ConsumerError)
  • Offset which will be fetched next (if a records exists in that offset) or else kafka:ConsumerError if the operation fails

getSubscription

()

returns string[] | ConsumerError

Retrieves the set of topics, which are currently subscribed by the consumer.

 string[]|kafka:ConsumerError result = consumer->getSubscription();
  • Return Type

    (string[] | ConsumerError)
  • Array of subscribed topics for the consumer if executes successfully or else a kafka:ConsumerError

getTopicPartitions

(string topic, int duration)

returns TopicPartition[] | ConsumerError

Retrieves the set of partitions to which the topic belongs.

 kafka:TopicPartition[]|kafka:ConsumerError result = consumer->getTopicPartitions("kafka-topic");

Parameters

  • topic string
  • The topic for which the partition information is needed

  • duration int (default -1)
  • Timeout duration for the get topic partitions operation to execute

  • Return Type

    (TopicPartition[] | ConsumerError)
  • Array of partitions for the given topic if executes successfully or else a kafka:ConsumerError

pause

(TopicPartition[] partitions)

returns ConsumerError?

Pauses retrieving messages from a set of partitions.

Parameters

  • partitions TopicPartition[]
  • Partitions to pause the retrieval of messages

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if an error is encountered or else nil

poll

(int timeoutValue)

returns ConsumerRecord[] | ConsumerError

Polls the consumer for the records of an external broker.

 kafka:ConsumerRecord[]|kafka:ConsumerError result = consumer->poll(1000);

Parameters

  • timeoutValue int
  • Polling time in milliseconds

resume

(TopicPartition[] partitions)

returns ConsumerError?

Resumes consumer retrieving messages from set of partitions which were paused earlier.

Parameters

  • partitions TopicPartition[]
  • Partitions to resume the retrieval of messages

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if an error is encountered or else ()

seek

(PartitionOffset offset)

returns ConsumerError?

Seeks for a given offset in a topic partition.

Parameters

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if an error is encountered or else ()

seekToBeginning

(TopicPartition[] partitions)

returns ConsumerError?

Seeks the beginning of the offsets for the given set of topic partitions.

Parameters

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if an error is encountered or else ()

seekToEnd

(TopicPartition[] partitions)

returns ConsumerError?

Seeks end of the offsets for the given set of topic partitions.

Parameters

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if an error is encountered or else ()

subscribe

(string[] topics)

returns ConsumerError?

Subscribes the consumer to the provided set of topics.

 kafka:ConsumerError? result = consumer->subscribe(["kafka-topic-1", "kafka-topic-2"]);

Parameters

  • topics string[]
  • Array of topics to be subscribed to

  • Return Type

    (ConsumerError?)
  • A kafka:ConsumerError if an error is encountered or else '()'

subscribeToPattern

(string regex)

returns ConsumerError?

Subscribes the consumer to the topics, which match the provided pattern.

 kafka:ConsumerError? result = consumer->subscribeToPattern("kafka.*");

Parameters

  • regex string
  • Pattern, which should be matched with the topics to be subscribed to

  • Return Type

    (ConsumerError?)
  • A kafka:ConsumerError if an error is encountered or else '()'

subscribeWithPartitionRebalance

(string[] topics, function(Consumer, TopicPartition[]) returns (()) onPartitionsRevoked, function(Consumer, TopicPartition[]) returns (()) onPartitionsAssigned)

returns ConsumerError?

Subscribes to the provided set of topics with rebalance listening enabled. This function can be used inside a service, to subscribe to a set of topics, while rebalancing the patition assignment of the consumers.

Parameters

  • topics string[]
  • Array of topics to be subscribed to

  • onPartitionsRevoked function(Consumer, TopicPartition[]) returns (())
  • Function which will be executed if partitions are revoked from this consumer

  • onPartitionsAssigned function(Consumer, TopicPartition[]) returns (())
  • Function which will be executed if partitions are assigned this consumer

  • Return Type

    (ConsumerError?)
  • kafka:ConsumerError if an error is encountered or else ()

unsubscribe

()

returns ConsumerError?

Unsubscribes from all the topic subscriptions.

 kafka:ConsumerError? result = consumer->unsubscribe();
  • Return Type

    (ConsumerError?)
  • A kafka:ConsumerError if an error is encountered or else '()'

__start

()

returns error?

Starts the registered services.

  • Return Type

    (error?)
  • An kafka:ConsumerError if an error is encountered while starting the server or else nil

__gracefulStop

()

returns error?

Stops the kafka listener.

  • Return Type

    (error?)
  • An kafka:ConsumerError if an error is encountered during the listener stopping process or else nil

__immediateStop

()

returns error?

Stops the kafka listener.

  • Return Type

    (error?)
  • An kafka:ConsumerError if an error is encountered during the listener stopping process or else nil

__attach

(service s, string? name)

returns error?

Gets called every time a service attaches itself to the listener.

Parameters

  • s service
  • The service to be attached

  • name string? (default ())
  • Name of the service

  • Return Type

    (error?)
  • An kafka:ConsumerError if an error is encountered while attaching the service or else nil

__detach

(service s)

returns error?

Detaches a consumer service from the listener.

Parameters

  • s service
  • The service to be detached

  • Return Type

    (error?)
  • An kafka:ConsumerError if an error is encountered while detaching a service or else nil