Class WorkManager<K,V>

java.lang.Object
io.confluent.parallelconsumer.state.WorkManager<K,V>
Type Parameters:
K -
V -
All Implemented Interfaces:
org.apache.kafka.clients.consumer.ConsumerRebalanceListener

public class WorkManager<K,V> extends Object implements org.apache.kafka.clients.consumer.ConsumerRebalanceListener
Sharded, prioritised, offset managed, order controlled, delayed work queue.

Low Water Mark - the highest offset (continuously successful) with all it's previous messages succeeded (the offset one commits to broker)

High Water Mark - the highest offset which has succeeded (previous may be incomplete)

Highest seen offset - the highest ever seen offset

This state is shared between the BrokerPollSystem thread and the AbstractParallelEoSStreamProcessor.

  • Constructor Details

  • Method Details

    • onPartitionsAssigned

      public void onPartitionsAssigned(Collection<org.apache.kafka.common.TopicPartition> partitions)
      Load offset map for assigned partitions
      Specified by:
      onPartitionsAssigned in interface org.apache.kafka.clients.consumer.ConsumerRebalanceListener
    • onPartitionsRevoked

      public void onPartitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> partitions)
      Clear offset map for revoked partitions

      AbstractParallelEoSStreamProcessor.onPartitionsRevoked(java.util.Collection<org.apache.kafka.common.TopicPartition>) handles committing off offsets upon revoke

      Specified by:
      onPartitionsRevoked in interface org.apache.kafka.clients.consumer.ConsumerRebalanceListener
      See Also:
    • onPartitionsLost

      public void onPartitionsLost(Collection<org.apache.kafka.common.TopicPartition> partitions)
      Clear offset map for lost partitions
      Specified by:
      onPartitionsLost in interface org.apache.kafka.clients.consumer.ConsumerRebalanceListener
    • registerWork

      public void registerWork(EpochAndRecordsMap<K,V> records)
    • getWorkIfAvailable

      public List<WorkContainer<K,V>> getWorkIfAvailable()
      Get work with no limit on quantity, useful for testing.
    • getWorkIfAvailable

      public List<WorkContainer<K,V>> getWorkIfAvailable(int requestedMaxWorkToRetrieve)
      Depth first work retrieval.
    • onSuccessResult

      public void onSuccessResult(WorkContainer<K,V> wc)
    • onOffsetCommitSuccess

      public void onOffsetCommitSuccess(Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> committed)
      Can run from controller or poller thread, depending on which is responsible for committing
      See Also:
    • onFailureResult

      public void onFailureResult(WorkContainer<K,V> wc)
    • getNumberOfEntriesInPartitionQueues

      public long getNumberOfEntriesInPartitionQueues()
    • collectCommitDataForDirtyPartitions

      public Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> collectCommitDataForDirtyPartitions()
    • checkIfWorkIsStale

      public boolean checkIfWorkIsStale(List<WorkContainer<K,V>> workContainers)
      Have our partitions been revoked? Can a batch contain messages of different epochs?
      Returns:
      true if any epoch is stale, false if not
      See Also:
    • checkIfWorkIsStale

      public boolean checkIfWorkIsStale(WorkContainer<K,V> workContainer)
      Have our partitions been revoked?
      Returns:
      true if epoch doesn't match, false if ok
    • shouldThrottle

      public boolean shouldThrottle()
    • isSufficientlyLoaded

      public boolean isSufficientlyLoaded()
      Returns:
      true if there's enough messages downloaded from the broker already to satisfy the pipeline, false if more should be downloaded (or pipelined in the Consumer)
    • workIsWaitingToBeProcessed

      public boolean workIsWaitingToBeProcessed()
    • hasWorkInFlight

      public boolean hasWorkInFlight()
    • isWorkInFlightMeetingTarget

      public boolean isWorkInFlightMeetingTarget()
    • getNumberOfWorkQueuedInShardsAwaitingSelection

      public long getNumberOfWorkQueuedInShardsAwaitingSelection()
    • hasWorkInCommitQueues

      public boolean hasWorkInCommitQueues()
    • isRecordsAwaitingProcessing

      public boolean isRecordsAwaitingProcessing()
    • isRecordsAwaitingToBeCommitted

      public boolean isRecordsAwaitingToBeCommitted()
    • handleFutureResult

      public void handleFutureResult(WorkContainer<K,V> wc)
    • isNoRecordsOutForProcessing

      public boolean isNoRecordsOutForProcessing()
    • getLowestRetryTime

      public Optional<Duration> getLowestRetryTime()
    • getOptions

      public ParallelConsumerOptions<K,V> getOptions()
    • getPm

      public PartitionStateManager<K,V> getPm()
    • getSm

      public ShardManager<K,V> getSm()
    • getNumberRecordsOutForProcessing

      public int getNumberRecordsOutForProcessing()
    • getSuccessfulWorkListeners

      public List<Consumer<WorkContainer<K,V>>> getSuccessfulWorkListeners()
      Useful for testing