Class WorkManager<K,V>
java.lang.Object
io.confluent.parallelconsumer.state.WorkManager<K,V>
- Type Parameters:
K-V-
- All Implemented Interfaces:
org.apache.kafka.clients.consumer.ConsumerRebalanceListener
public class WorkManager<K,V>
extends Object
implements org.apache.kafka.clients.consumer.ConsumerRebalanceListener
Sharded, prioritised, offset managed, order controlled, delayed work queue.
Low Water Mark - the highest offset (continuously successful) with all it's previous messages succeeded (the offset one commits to broker)
High Water Mark - the highest offset which has succeeded (previous may be incomplete)
Highest seen offset - the highest ever seen offset
This state is shared between the BrokerPollSystem thread and the AbstractParallelEoSStreamProcessor.
-
Constructor Summary
ConstructorsConstructorDescriptionWorkManager(ParallelConsumerOptions<K, V> options, org.apache.kafka.clients.consumer.Consumer<K, V> consumer) WorkManager(ParallelConsumerOptions<K, V> newOptions, org.apache.kafka.clients.consumer.Consumer<K, V> consumer, DynamicLoadFactor dynamicExtraLoadFactor, Clock clock) WorkManager(ParallelConsumerOptions<K, V> options, org.apache.kafka.clients.consumer.Consumer<K, V> consumer, Clock clock) Use a privateDynamicLoadFactor, useful for testing. -
Method Summary
Modifier and TypeMethodDescriptionbooleancheckIfWorkIsStale(WorkContainer<K, V> workContainer) Have our partitions been revoked?booleancheckIfWorkIsStale(List<WorkContainer<K, V>> workContainers) Have our partitions been revoked? Can a batch contain messages of different epochs?Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> longlongintgetPm()getSm()List<Consumer<WorkContainer<K,V>>> Useful for testingList<WorkContainer<K,V>> Get work with no limit on quantity, useful for testing.List<WorkContainer<K,V>> getWorkIfAvailable(int requestedMaxWorkToRetrieve) Depth first work retrieval.voidbooleanbooleanbooleanbooleanbooleanbooleanbooleanvoidonFailureResult(WorkContainer<K, V> wc) voidonOffsetCommitSuccess(Map<org.apache.kafka.common.TopicPartition, org.apache.kafka.clients.consumer.OffsetAndMetadata> committed) Can run from controller or poller thread, depending on which is responsible for committingvoidonPartitionsAssigned(Collection<org.apache.kafka.common.TopicPartition> partitions) Load offset map for assigned partitionsvoidonPartitionsLost(Collection<org.apache.kafka.common.TopicPartition> partitions) Clear offset map for lost partitionsvoidonPartitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> partitions) Clear offset map for revoked partitionsvoidonSuccessResult(WorkContainer<K, V> wc) voidregisterWork(EpochAndRecordsMap<K, V> records) booleanboolean
-
Constructor Details
-
WorkManager
public WorkManager(ParallelConsumerOptions<K, V> options, org.apache.kafka.clients.consumer.Consumer<K, V> consumer) -
WorkManager
public WorkManager(ParallelConsumerOptions<K, V> options, org.apache.kafka.clients.consumer.Consumer<K, V> consumer, Clock clock) Use a privateDynamicLoadFactor, useful for testing. -
WorkManager
public WorkManager(ParallelConsumerOptions<K, V> newOptions, org.apache.kafka.clients.consumer.Consumer<K, V> consumer, DynamicLoadFactor dynamicExtraLoadFactor, Clock clock)
-
-
Method Details
-
onPartitionsAssigned
Load offset map for assigned partitions- Specified by:
onPartitionsAssignedin interfaceorg.apache.kafka.clients.consumer.ConsumerRebalanceListener
-
onPartitionsRevoked
Clear offset map for revoked partitionsAbstractParallelEoSStreamProcessor.onPartitionsRevoked(java.util.Collection<org.apache.kafka.common.TopicPartition>)handles committing off offsets upon revoke- Specified by:
onPartitionsRevokedin interfaceorg.apache.kafka.clients.consumer.ConsumerRebalanceListener- See Also:
-
onPartitionsLost
Clear offset map for lost partitions- Specified by:
onPartitionsLostin interfaceorg.apache.kafka.clients.consumer.ConsumerRebalanceListener
-
registerWork
-
getWorkIfAvailable
Get work with no limit on quantity, useful for testing. -
getWorkIfAvailable
Depth first work retrieval. -
onSuccessResult
-
onOffsetCommitSuccess
public void onOffsetCommitSuccess(Map<org.apache.kafka.common.TopicPartition, org.apache.kafka.clients.consumer.OffsetAndMetadata> committed) Can run from controller or poller thread, depending on which is responsible for committing -
onFailureResult
-
getNumberOfEntriesInPartitionQueues
public long getNumberOfEntriesInPartitionQueues() -
collectCommitDataForDirtyPartitions
public Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> collectCommitDataForDirtyPartitions() -
checkIfWorkIsStale
Have our partitions been revoked? Can a batch contain messages of different epochs?- Returns:
- true if any epoch is stale, false if not
- See Also:
-
checkIfWorkIsStale
Have our partitions been revoked?- Returns:
- true if epoch doesn't match, false if ok
-
shouldThrottle
public boolean shouldThrottle() -
isSufficientlyLoaded
public boolean isSufficientlyLoaded()- Returns:
- true if there's enough messages downloaded from the broker already to satisfy the pipeline, false if more should be downloaded (or pipelined in the Consumer)
-
workIsWaitingToBeProcessed
public boolean workIsWaitingToBeProcessed() -
hasWorkInFlight
public boolean hasWorkInFlight() -
isWorkInFlightMeetingTarget
public boolean isWorkInFlightMeetingTarget() -
getNumberOfWorkQueuedInShardsAwaitingSelection
public long getNumberOfWorkQueuedInShardsAwaitingSelection() -
hasWorkInCommitQueues
public boolean hasWorkInCommitQueues() -
isRecordsAwaitingProcessing
public boolean isRecordsAwaitingProcessing() -
isRecordsAwaitingToBeCommitted
public boolean isRecordsAwaitingToBeCommitted() -
handleFutureResult
-
isNoRecordsOutForProcessing
public boolean isNoRecordsOutForProcessing() -
getLowestRetryTime
-
getOptions
-
getPm
-
getSm
-
getNumberRecordsOutForProcessing
public int getNumberRecordsOutForProcessing() -
getSuccessfulWorkListeners
Useful for testing
-