Clients -
kafka :
Consumer
Represents a Kafka consumer endpoint.
Constructor
__init
(ConsumerConfiguration config)
- config ConsumerConfiguration
-
Configurations related to consumer endpoint
Remote Methods
assign | Assigns consumer to a set of topic partitions. |
close | Closes the consumer connection of the external Kafka broker. |
commit | Commits the current consumed offsets for the consumer. |
commitOffset | Commits given offsets and partitions for the given topics, for consumer. |
connect | Connects the consumer to the provided host in the consumer configs. |
getAssignment | Retrieves the currently-assigned partitions for the consumer. |
getAvailableTopics | Retrieves the available list of topics for a particular consumer. |
getBeginningOffsets | Retrieves the start offsets for given set of partitions. |
getCommittedOffset | Retrieves the last committed offsets for the given topic partitions. |
getEndOffsets | Retrieves the last offsets for given set of partitions. |
getPausedPartitions | Retrieves the partitions, which are currently paused. |
getPositionOffset | Retrieves the offset of the next record that will be fetched, if a records exists in that position. |
getSubscription | Retrieves the set of topics, which are currently subscribed by the consumer. |
getTopicPartitions | Retrieves the set of partitions to which the topic belongs. |
pause | Pauses retrieving messages from a set of partitions. |
poll | Polls the consumer for the records of an external broker. |
resume | Resumes consumer retrieving messages from set of partitions which were paused earlier. |
seek | Seeks for a given offset in a topic partition. |
seekToBeginning | Seeks the beginning of the offsets for the given set of topic partitions. |
seekToEnd | Seeks end of the offsets for the given set of topic partitions. |
subscribe | Subscribes the consumer to the provided set of topics. |
subscribeToPattern | Subscribes the consumer to the topics, which match the provided pattern. |
subscribeWithPartitionRebalance | Subscribes to the provided set of topics with rebalance listening enabled. |
unsubscribe | Unsubscribes from all the topic subscriptions. |
Methods
Starts the registered services.
Stops the kafka listener.
Stops the kafka listener.
Gets called every time a service attaches itself to the listener.
Detaches a consumer service from the listener.
Fields
- consumerConfig ConsumerConfiguration
-
Used to store configurations related to a Kafka connection
Assigns consumer to a set of topic partitions.
Parameters
- partitions TopicPartition[]
-
Topic partitions to be assigned
-
Return Type
(ConsumerError?) kafka:ConsumerError
if an error is encountered or else nil
Closes the consumer connection of the external Kafka broker.
kafka:ConsumerError? result = consumer->close();
Parameters
- duration int (default -1)
-
Timeout duration for the close operation execution
-
Return Type
(ConsumerError?) A
kafka:ConsumerError
if an error is encountered or else '()'
Commits the current consumed offsets for the consumer.
kafka:ConsumerError? result = consumer->commit();
-
Return Type
(ConsumerError?) A
kafka:ConsumerError
if an error is encountered or else '()'
Commits given offsets and partitions for the given topics, for consumer.
Parameters
- offsets PartitionOffset[]
-
Offsets to be commited
- duration int (default -1)
-
Timeout duration for the commit operation execution
-
Return Type
(ConsumerError?) kafka:ConsumerError
if an error is encountered or else nil
Connects the consumer to the provided host in the consumer configs.
kafka:ConsumerError? result = consumer->connect();
-
Return Type
(ConsumerError?) A
kafka:ConsumerError
if an error is encountered or else '()'
Retrieves the currently-assigned partitions for the consumer.
kafka:TopicPartition[]|kafka:ConsumerError result = consumer->getAssignment();
-
Return Type
(TopicPartition[] | ConsumerError) Array of assigned partitions for the consumer if executes successfully or else a
kafka:ConsumerError
Retrieves the available list of topics for a particular consumer.
string[]|kafka:ConsumerError result = consumer->getAvailableTopics();
Parameters
- duration int (default -1)
-
Timeout duration for the execution of the
get available topics
operation
-
Return Type
(string[] | ConsumerError) Array of topics currently available (authorized) for the consumer to subscribe or else a
kafka:ConsumerError
getBeginningOffsets
(TopicPartition[] partitions, int duration)
returns PartitionOffset[] | ConsumerErrorRetrieves the start offsets for given set of partitions.
Parameters
- partitions TopicPartition[]
-
Array of topic partitions to get the starting offsets
- duration int (default -1)
-
Timeout duration for the get beginning offsets execution
-
Return Type
(PartitionOffset[] | ConsumerError) Starting offsets for the given partitions if executes successfully or else
kafka:ConsumerError
Retrieves the last committed offsets for the given topic partitions.
Parameters
- partition TopicPartition
-
The
TopicPartition
in which the committed offset is returned for consumer
- duration int (default -1)
-
Timeout duration for the get committed offset operation to execute
-
Return Type
(PartitionOffset | ConsumerError) Committed offset for the consumer for the given partition if executes successfully or else
kafka:ConsumerError
Retrieves the last offsets for given set of partitions.
Parameters
- partitions TopicPartition[]
-
Set of partitions to get the last offsets
- duration int (default -1)
-
Timeout duration for the get end offsets operation to execute
-
Return Type
(PartitionOffset[] | ConsumerError) End offsets for the given partitions if executes successfully or else
kafka:ConsumerError
Retrieves the partitions, which are currently paused.
kafka:TopicPartition[]|kafka:ConsumerError result = consumer->getPausedPartitions();
-
Return Type
(TopicPartition[] | ConsumerError) Set of partitions paused from message retrieval if executes successfully or else a
kafka:ConsumerError
Retrieves the offset of the next record that will be fetched, if a records exists in that position.
Parameters
- partition TopicPartition
-
The
TopicPartition
in which the position is required
- duration int (default -1)
-
Timeout duration for the get position offset operation to execute
-
Return Type
(int | ConsumerError) Offset which will be fetched next (if a records exists in that offset) or else
kafka:ConsumerError
if the operation fails
Retrieves the set of topics, which are currently subscribed by the consumer.
string[]|kafka:ConsumerError result = consumer->getSubscription();
-
Return Type
(string[] | ConsumerError) Array of subscribed topics for the consumer if executes successfully or else a
kafka:ConsumerError
Retrieves the set of partitions to which the topic belongs.
kafka:TopicPartition[]|kafka:ConsumerError result = consumer->getTopicPartitions("kafka-topic");
Parameters
- topic string
-
The topic for which the partition information is needed
- duration int (default -1)
-
Timeout duration for the
get topic partitions
operation to execute
-
Return Type
(TopicPartition[] | ConsumerError) Array of partitions for the given topic if executes successfully or else a
kafka:ConsumerError
Pauses retrieving messages from a set of partitions.
Parameters
- partitions TopicPartition[]
-
Partitions to pause the retrieval of messages
-
Return Type
(ConsumerError?) kafka:ConsumerError
if an error is encountered or else nil
Polls the consumer for the records of an external broker.
kafka:ConsumerRecord[]|kafka:ConsumerError result = consumer->poll(1000);
Parameters
- timeoutValue int
-
Polling time in milliseconds
-
Return Type
(ConsumerRecord[] | ConsumerError) Array of consumer records if executed successfully or else a
kafka:ConsumerError
Resumes consumer retrieving messages from set of partitions which were paused earlier.
Parameters
- partitions TopicPartition[]
-
Partitions to resume the retrieval of messages
-
Return Type
(ConsumerError?) kafka:ConsumerError
if an error is encountered or else ()
Seeks for a given offset in a topic partition.
Parameters
- offset PartitionOffset
-
The
PartitionOffset
to seek
-
Return Type
(ConsumerError?) kafka:ConsumerError
if an error is encountered or else ()
Seeks the beginning of the offsets for the given set of topic partitions.
Parameters
- partitions TopicPartition[]
-
The set of topic partitions to seek
-
Return Type
(ConsumerError?) kafka:ConsumerError
if an error is encountered or else ()
Seeks end of the offsets for the given set of topic partitions.
Parameters
- partitions TopicPartition[]
-
The set of topic partitions to seek
-
Return Type
(ConsumerError?) kafka:ConsumerError
if an error is encountered or else ()
Subscribes the consumer to the provided set of topics.
kafka:ConsumerError? result = consumer->subscribe(["kafka-topic-1", "kafka-topic-2"]);
Parameters
- topics string[]
-
Array of topics to be subscribed to
-
Return Type
(ConsumerError?) A
kafka:ConsumerError
if an error is encountered or else '()'
Subscribes the consumer to the topics, which match the provided pattern.
kafka:ConsumerError? result = consumer->subscribeToPattern("kafka.*");
Parameters
- regex string
-
Pattern, which should be matched with the topics to be subscribed to
-
Return Type
(ConsumerError?) A
kafka:ConsumerError
if an error is encountered or else '()'
subscribeWithPartitionRebalance
(string[] topics, function(Consumer, TopicPartition[]) returns (())
onPartitionsRevoked, function(Consumer, TopicPartition[]) returns (())
onPartitionsAssigned)
Subscribes to the provided set of topics with rebalance listening enabled. This function can be used inside a service, to subscribe to a set of topics, while rebalancing the patition assignment of the consumers.
Parameters
- topics string[]
-
Array of topics to be subscribed to
-
onPartitionsRevoked
function(Consumer, TopicPartition[]) returns (())
-
Function which will be executed if partitions are revoked from this consumer
-
onPartitionsAssigned
function(Consumer, TopicPartition[]) returns (())
-
Function which will be executed if partitions are assigned this consumer
-
Return Type
(ConsumerError?) kafka:ConsumerError
if an error is encountered or else ()
Unsubscribes from all the topic subscriptions.
kafka:ConsumerError? result = consumer->unsubscribe();
-
Return Type
(ConsumerError?) A
kafka:ConsumerError
if an error is encountered or else '()'
Starts the registered services.
-
Return Type
(error?) An
kafka:ConsumerError
if an error is encountered while starting the server or else nil
Stops the kafka listener.
-
Return Type
(error?) An
kafka:ConsumerError
if an error is encountered during the listener stopping process or else nil
Stops the kafka listener.
-
Return Type
(error?) An
kafka:ConsumerError
if an error is encountered during the listener stopping process or else nil
Gets called every time a service attaches itself to the listener.
Parameters
- s service
-
The service to be attached
- name string? (default ())
-
Name of the service
-
Return Type
(error?) An
kafka:ConsumerError
if an error is encountered while attaching the service or else nil