Class AbstractParallelEoSStreamProcessor<K,V>
- All Implemented Interfaces:
DrainingCloseable,ParallelConsumer<K,,V> Closeable,AutoCloseable,org.apache.kafka.clients.consumer.ConsumerRebalanceListener
- Direct Known Subclasses:
ExternalEngine,ParallelEoSStreamProcessor
- See Also:
-
Nested Class Summary
Nested classes/interfaces inherited from interface io.confluent.parallelconsumer.internal.DrainingCloseable
DrainingCloseable.DrainingModeNested classes/interfaces inherited from interface io.confluent.parallelconsumer.ParallelConsumer
ParallelConsumer.Tuple<L,R> -
Field Summary
FieldsModifier and TypeFieldDescriptionprotected final DynamicLoadFactorMultiple ofParallelConsumerOptions.getMaxConcurrency()to have in our processing queue, in order to make sure threads always have work to do.static final Stringstatic final Stringprotected final ParallelConsumerOptionsprotected final WorkManager<K,V> protected final ThreadPoolExecutorThe pool which is used for running the users's supplied functionFields inherited from interface io.confluent.parallelconsumer.internal.DrainingCloseable
DEFAULT_TIMEOUT -
Constructor Summary
ConstructorsConstructorDescriptionAbstractParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> newOptions) Construct the AsyncConsumer by wrapping this passed in conusmer and producer, which can be configured any which way as per normal. -
Method Summary
Modifier and TypeMethodDescriptionvoidPlugin a function to run at the end of each main loop.protected voidaddToMailbox(WorkContainer<K, V> wc) protected voidaddToMailBoxOnUserFunctionSuccess(WorkContainer<K, V> wc, List<?> resultsFromUserFunction) protected intprotected voidChecks the system has enough pressure in the pipeline of work, if not attempts to step up the load factor.voidclose()Close the system, without draining.voidclose(Duration timeout, DrainingCloseable.DrainingMode drainMode) Close the consumer.getMyId()Optional ID of this instance.intprotected ParallelConsumerOptionsprotected Optional<ProducerManager<K,V>> protected intprotected intTime between commits.getWm()protected BlockingQueue<io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.ControllerEventMessage<K,V>> Collection of work waiting to bebooleanvoidEarly notify of work arrived.voidonPartitionsAssigned(Collection<org.apache.kafka.common.TopicPartition> partitions) Delegate toWorkManagervoidonPartitionsLost(Collection<org.apache.kafka.common.TopicPartition> partitions) Cannot commit any offsets for partitions that have been `lost` (as opposed to revoked).voidonPartitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> partitions) Commit our offsetsprotected voidonUserFunctionSuccess(WorkContainer<K, V> wc, List<?> resultsFromUserFunction) voidPause this consumer (i.e.voidregisterWork(EpochAndRecordsMap<K, V> polledRecords) voidRequest a commit as soon as possible (ASAP), overriding other constraints.voidResume this consumer (i.e.protected <R> List<ParallelConsumer.Tuple<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>, R>> runUserFunction(Function<PollContextInternal<K, V>, List<R>> usersFunction, Consumer<R> callback, List<WorkContainer<K, V>> workContainerBatch) Run the supplied function.voidsetLongPollTimeout(Duration ofMillis) voidOptional ID of this instance.voidsetTimeBetweenCommits(Duration timeBetweenCommits) Time between commits.protected ThreadPoolExecutorsetupWorkerPool(int poolSize) protected <R> voidsubmitWorkToPool(Function<PollContextInternal<K, V>, List<R>> usersFunction, Consumer<R> callback, List<WorkContainer<K, V>> workToProcess) Submit a piece of work to the processing pool.voidsubscribe(Collection<String> topics) voidsubscribe(Collection<String> topics, org.apache.kafka.clients.consumer.ConsumerRebalanceListener callback) voidvoidprotected <R> voidsupervisorLoop(Function<PollContextInternal<K, V>, List<R>> userFunctionWrapped, Consumer<R> callback) Supervisor loop for the main loop.voidwaitForProcessedNotCommitted(Duration timeout) Deprecated.no longer used, will be removed in next versionlongOf the records consumed from the broker, how many do we have remaining in our local queuesMethods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface io.confluent.parallelconsumer.internal.DrainingCloseable
closeDontDrainFirst, closeDontDrainFirst, closeDrainFirst, closeDrainFirst
-
Field Details
-
MDC_INSTANCE_ID
- See Also:
-
MDC_OFFSET_MARKER
- See Also:
-
options
-
workerThreadPool
The pool which is used for running the users's supplied function -
wm
-
dynamicExtraLoadFactor
Multiple ofParallelConsumerOptions.getMaxConcurrency()to have in our processing queue, in order to make sure threads always have work to do.
-
-
Constructor Details
-
AbstractParallelEoSStreamProcessor
Construct the AsyncConsumer by wrapping this passed in conusmer and producer, which can be configured any which way as per normal.- See Also:
-
-
Method Details
-
isClosedOrFailed
public boolean isClosedOrFailed() -
getFailureCause
- Returns:
- if the system failed, returns the recorded reason.
-
setupWorkerPool
-
subscribe
- Specified by:
subscribein interfaceParallelConsumer<K,V> - See Also:
-
KafkaConsumer.subscribe(Collection)
-
subscribe
- Specified by:
subscribein interfaceParallelConsumer<K,V> - See Also:
-
KafkaConsumer.subscribe(Pattern)
-
subscribe
public void subscribe(Collection<String> topics, org.apache.kafka.clients.consumer.ConsumerRebalanceListener callback) - Specified by:
subscribein interfaceParallelConsumer<K,V> - See Also:
-
KafkaConsumer.subscribe(Collection, ConsumerRebalanceListener)
-
subscribe
public void subscribe(Pattern pattern, org.apache.kafka.clients.consumer.ConsumerRebalanceListener callback) - Specified by:
subscribein interfaceParallelConsumer<K,V> - See Also:
-
KafkaConsumer.subscribe(Pattern, ConsumerRebalanceListener)
-
onPartitionsRevoked
Commit our offsetsMake sure the calling thread is the thread which performs commit - i.e. is the
OffsetCommitter.- Specified by:
onPartitionsRevokedin interfaceorg.apache.kafka.clients.consumer.ConsumerRebalanceListener
-
onPartitionsAssigned
Delegate toWorkManager- Specified by:
onPartitionsAssignedin interfaceorg.apache.kafka.clients.consumer.ConsumerRebalanceListener- See Also:
-
onPartitionsLost
Cannot commit any offsets for partitions that have been `lost` (as opposed to revoked). Just delegate toWorkManagerfor truncation.- Specified by:
onPartitionsLostin interfaceorg.apache.kafka.clients.consumer.ConsumerRebalanceListener- See Also:
-
close
public void close()Close the system, without draining.- Specified by:
closein interfaceAutoCloseable- Specified by:
closein interfaceCloseable- Specified by:
closein interfaceDrainingCloseable- See Also:
-
close
Description copied from interface:DrainingCloseableClose the consumer.- Specified by:
closein interfaceDrainingCloseable- Parameters:
timeout- how long to wait before giving updrainMode- wait for messages already consumed from the broker to be processed before closing
-
waitForProcessedNotCommitted
Deprecated.no longer used, will be removed in next versionBlock the calling thread until no more messages are being processed.Used for testing.
-
supervisorLoop
protected <R> void supervisorLoop(Function<PollContextInternal<K, V>, List<R>> userFunctionWrapped, Consumer<R> callback) Supervisor loop for the main loop.- See Also:
-
submitWorkToPool
protected <R> void submitWorkToPool(Function<PollContextInternal<K, V>, List<R>> usersFunction, Consumer<R> callback, List<WorkContainer<K, V>> workToProcess) Submit a piece of work to the processing pool.- Parameters:
workToProcess- the polled records to process
-
calculateQuantityToRequest
protected int calculateQuantityToRequest()- Returns:
- number of
WorkContainerto try to get
-
getTargetOutForProcessing
protected int getTargetOutForProcessing() -
getQueueTargetLoaded
protected int getQueueTargetLoaded() -
checkPipelinePressure
protected void checkPipelinePressure()Checks the system has enough pressure in the pipeline of work, if not attempts to step up the load factor. -
runUserFunction
protected <R> List<ParallelConsumer.Tuple<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>, runUserFunctionR>> (Function<PollContextInternal<K, V>, List<R>> usersFunction, Consumer<R> callback, List<WorkContainer<K, V>> workContainerBatch) Run the supplied function. -
addToMailBoxOnUserFunctionSuccess
protected void addToMailBoxOnUserFunctionSuccess(WorkContainer<K, V> wc, List<?> resultsFromUserFunction) -
onUserFunctionSuccess
-
addToMailbox
-
registerWork
-
notifySomethingToDo
public void notifySomethingToDo()Early notify of work arrived.Only wake up the thread if it's sleeping while polling the mail box.
- See Also:
-
processWorkCompleteMailBox()blockableControlThread
-
workRemaining
public long workRemaining()Description copied from interface:DrainingCloseableOf the records consumed from the broker, how many do we have remaining in our local queues- Specified by:
workRemainingin interfaceDrainingCloseable- Returns:
- the number of consumed but outstanding records to process
-
addLoopEndCallBack
Plugin a function to run at the end of each main loop.Useful for testing and controlling loop progression.
-
setLongPollTimeout
-
requestCommitAsap
public void requestCommitAsap()Request a commit as soon as possible (ASAP), overriding other constraints. -
pauseIfRunning
public void pauseIfRunning()Description copied from interface:ParallelConsumerPause this consumer (i.e. stop processing of messages).This operation only has an effect if the consumer is currently running. In all other cases calling this method will be silent a no-op.
Once the consumer is paused, the system will stop submitting work to the processing pool. Already submitted in flight work however will be finished. This includes work that is currently being processed inside a user function as well as work that has already been submitted to the processing pool but has not been picked up by a free worker yet.
General remarks:
- A paused consumer may still keep polling for new work until internal buffers are filled.
- This operation does not actively pause the subscription on the underlying Kafka Broker (compared to
KafkaConsumer#pause). - Pending offset commits will still be performed when the consumer is paused.
- Specified by:
pauseIfRunningin interfaceParallelConsumer<K,V>
-
resumeIfPaused
public void resumeIfPaused()Description copied from interface:ParallelConsumerResume this consumer (i.e. continue processing of messages).This operation only has an effect if the consumer is currently paused. In all other cases calling this method will be a silent no-op.
- Specified by:
resumeIfPausedin interfaceParallelConsumer<K,V>
-
getOptions
-
setTimeBetweenCommits
Time between commits. Using a higher frequency will put more load on the brokers. -
getTimeBetweenCommits
Time between commits. Using a higher frequency will put more load on the brokers. -
getProducerManager
-
getWm
-
getWorkMailBox
protected BlockingQueue<io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.ControllerEventMessage<K,V>> getWorkMailBox()Collection of work waiting to be -
getNumberOfAssignedPartitions
public int getNumberOfAssignedPartitions() -
setMyId
Optional ID of this instance. Useful for testing. -
getMyId
Optional ID of this instance. Useful for testing.
-