/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import io.netty.util.Recycler;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.exception.TransactionException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcherSingleActiveConsumer
implements Dispatcher,
AsyncCallbacks.ReadEntriesCallback {
    private final AtomicBoolean isRescheduleReadInProgress = new AtomicBoolean(false);
    protected final PersistentTopic topic;
    protected final Executor topicExecutor;
    protected final String name;
    private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
    protected volatile boolean havePendingRead = false;
    protected volatile int readBatchSize;
    protected final Backoff readFailureBackoff;
    private volatile ScheduledFuture<?> readOnActiveConsumerTask = null;
    private final RedeliveryTracker redeliveryTracker;
    private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);

    public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, CommandSubscribe.SubType subscriptionType, int partitionIndex, PersistentTopic topic, Subscription subscription) {
        super(subscriptionType, partitionIndex, topic.getName(), subscription, topic.getBrokerService().pulsar().getConfiguration(), cursor);
        this.topic = topic;
        this.topicExecutor = topic.getBrokerService().getTopicOrderedExecutor().chooseThread((Object)this.topicName);
        this.name = topic.getName() + " / " + (cursor.getName() != null ? Codec.decode((String)cursor.getName()) : "");
        this.readBatchSize = this.serviceConfig.getDispatcherMaxReadBatchSize();
        this.readFailureBackoff = new Backoff((long)this.serviceConfig.getDispatcherReadFailureBackoffInitialTimeInMs(), TimeUnit.MILLISECONDS, (long)this.serviceConfig.getDispatcherReadFailureBackoffMaxTimeInMs(), TimeUnit.MILLISECONDS, (long)this.serviceConfig.getDispatcherReadFailureBackoffMandatoryStopTimeInMs(), TimeUnit.MILLISECONDS);
        this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
        this.initializeDispatchRateLimiterIfNeeded();
    }

    @Override
    protected void scheduleReadOnActiveConsumer() {
        this.cancelPendingRead();
        if (this.havePendingRead) {
            return;
        }
        if (this.subscriptionType != CommandSubscribe.SubType.Failover || this.serviceConfig.getActiveConsumerFailoverDelayTimeMillis() <= 0) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Rewind cursor and read more entries without delay", (Object)this.name);
            }
            this.cursor.rewind();
            Consumer activeConsumer = (Consumer)ACTIVE_CONSUMER_UPDATER.get(this);
            this.notifyActiveConsumerChanged(activeConsumer);
            this.readMoreEntries(activeConsumer);
            return;
        }
        if (this.readOnActiveConsumerTask != null) {
            return;
        }
        this.readOnActiveConsumerTask = this.topic.getBrokerService().executor().schedule(() -> {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Rewind cursor and read more entries after {} ms delay", (Object)this.name, (Object)this.serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
            }
            this.cursor.rewind();
            Consumer activeConsumer = (Consumer)ACTIVE_CONSUMER_UPDATER.get(this);
            this.notifyActiveConsumerChanged(activeConsumer);
            this.readMoreEntries(activeConsumer);
            this.readOnActiveConsumerTask = null;
        }, (long)this.serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    protected boolean isConsumersExceededOnSubscription() {
        return this.isConsumersExceededOnSubscription(this.topic, this.consumers.size());
    }

    @Override
    protected void cancelPendingRead() {
        if (this.havePendingRead && this.cursor.cancelPendingReadRequest()) {
            this.havePendingRead = false;
        }
    }

    public void readEntriesComplete(List<Entry> entries, Object obj) {
        this.topicExecutor.execute(() -> this.internalReadEntriesComplete(entries, obj));
    }

    public synchronized void internalReadEntriesComplete(List<Entry> entries, Object obj) {
        ReadEntriesCtx readEntriesCtx = (ReadEntriesCtx)obj;
        Consumer readConsumer = readEntriesCtx.getConsumer();
        long epoch = readEntriesCtx.getEpoch();
        readEntriesCtx.recycle();
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Got messages: {}", new Object[]{this.name, readConsumer, entries.size()});
        }
        this.havePendingRead = false;
        this.isFirstRead = false;
        if (this.readBatchSize < this.serviceConfig.getDispatcherMaxReadBatchSize()) {
            int newReadBatchSize = Math.min(this.readBatchSize * 2, this.serviceConfig.getDispatcherMaxReadBatchSize());
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Increasing read batch size from {} to {}", new Object[]{this.name, readConsumer, this.readBatchSize, newReadBatchSize});
            }
            this.readBatchSize = newReadBatchSize;
        }
        this.readFailureBackoff.reduceToHalf();
        Consumer currentConsumer = (Consumer)ACTIVE_CONSUMER_UPDATER.get(this);
        if (this.isKeyHashRangeFiltered) {
            Iterator<Entry> iterator = entries.iterator();
            while (iterator.hasNext()) {
                Entry entry = iterator.next();
                byte[] key = this.peekStickyKey(entry.getDataBuffer());
                Consumer consumer = this.stickyKeyConsumerSelector.select(key);
                if (consumer != null && currentConsumer == consumer) continue;
                entry.release();
                iterator.remove();
            }
        }
        if (currentConsumer == null || readConsumer != currentConsumer) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] rewind because no available consumer found", (Object)this.name);
            }
            entries.forEach(Entry::release);
            this.cursor.rewind();
            if (currentConsumer != null) {
                this.notifyActiveConsumerChanged(currentConsumer);
                this.readMoreEntries(currentConsumer);
            }
        } else {
            EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
            SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
            EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entries.size());
            this.filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, batchIndexesAcks, this.cursor, false, currentConsumer);
            this.dispatchEntriesToConsumer(currentConsumer, entries, batchSizes, batchIndexesAcks, sendMessageInfo, epoch);
        }
    }

    protected void dispatchEntriesToConsumer(Consumer currentConsumer, List<Entry> entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks, SendMessageInfo sendMessageInfo, long epoch) {
        currentConsumer.sendMessages(entries, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), this.redeliveryTracker, epoch).addListener(future -> {
            if (future.isSuccess()) {
                this.acquirePermitsForDeliveredMessages(this.topic, this.cursor, entries.size(), sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes());
                this.topicExecutor.execute(() -> {
                    PersistentDispatcherSingleActiveConsumer persistentDispatcherSingleActiveConsumer = this;
                    synchronized (persistentDispatcherSingleActiveConsumer) {
                        Consumer newConsumer = this.getActiveConsumer();
                        this.readMoreEntries(newConsumer);
                    }
                });
            }
        });
    }

    @Override
    public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
        this.topicExecutor.execute(() -> this.internalConsumerFlow(consumer));
    }

    private synchronized void internalConsumerFlow(Consumer consumer) {
        if (this.havePendingRead) {
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Ignoring flow control message since we already have a pending read req", (Object)this.name, (Object)consumer);
            }
        } else if (ACTIVE_CONSUMER_UPDATER.get(this) != consumer) {
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Ignoring flow control message since consumer is not active partition consumer", (Object)this.name, (Object)consumer);
            }
        } else if (this.readOnActiveConsumerTask != null) {
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Ignoring flow control message since consumer is waiting for cursor to be rewinded", (Object)this.name, (Object)consumer);
            }
        } else {
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Trigger new read after receiving flow control message", (Object)this.name, (Object)consumer);
            }
            this.readMoreEntries(consumer);
        }
    }

    @Override
    public void redeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch) {
        this.topicExecutor.execute(() -> this.internalRedeliverUnacknowledgedMessages(consumer, consumerEpoch));
    }

    private synchronized void internalRedeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch) {
        if (consumerEpoch > consumer.getConsumerEpoch()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Update epoch, old epoch [{}], new epoch [{}]", new Object[]{this.name, consumer, consumer.getConsumerEpoch(), consumerEpoch});
            }
            consumer.setConsumerEpoch(consumerEpoch);
        }
        if (consumer != ACTIVE_CONSUMER_UPDATER.get(this)) {
            log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: Only the active consumer can call resend", (Object)this.name, (Object)consumer);
            return;
        }
        if (this.readOnActiveConsumerTask != null) {
            log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: consumer is waiting for cursor to be rewinded", (Object)this.name, (Object)consumer);
            return;
        }
        this.cursor.cancelPendingReadRequest();
        this.havePendingRead = false;
        this.cursor.rewind();
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Cursor rewinded, redelivering unacknowledged messages. ", (Object)this.name, (Object)consumer);
        }
        this.readMoreEntries(consumer);
    }

    @Override
    public void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
        this.redeliverUnacknowledgedMessages(consumer, -1L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void readMoreEntries(Consumer consumer) {
        if (null == consumer) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Skipping read for the topic, Due to the current consumer is null", (Object)this.topic.getName());
            }
            return;
        }
        if (this.havePendingRead) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Skipping read for the topic, Due to we have pending read.", (Object)this.topic.getName());
            }
            return;
        }
        if (consumer.getAvailablePermits() > 0) {
            PersistentDispatcherSingleActiveConsumer persistentDispatcherSingleActiveConsumer = this;
            synchronized (persistentDispatcherSingleActiveConsumer) {
                if (this.havePendingRead) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Skipping read for the topic, Due to we have pending read.", (Object)this.topic.getName());
                    }
                    return;
                }
                Pair<Integer, Long> calculateResult = this.calculateToRead(consumer);
                int messagesToRead = (Integer)calculateResult.getLeft();
                long bytesToRead = (Long)calculateResult.getRight();
                if (-1 == messagesToRead || bytesToRead == -1L) {
                    return;
                }
                if (log.isDebugEnabled()) {
                    log.debug("[{}-{}] Schedule read of {} messages", new Object[]{this.name, consumer, messagesToRead});
                }
                this.havePendingRead = true;
                if (consumer.readCompacted()) {
                    this.topic.getCompactedTopic().asyncReadEntriesOrWait(this.cursor, messagesToRead, this.isFirstRead, this, consumer);
                } else {
                    ReadEntriesCtx readEntriesCtx = ReadEntriesCtx.create(consumer, consumer.getConsumerEpoch());
                    this.cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, (AsyncCallbacks.ReadEntriesCallback)this, (Object)readEntriesCtx, this.topic.getMaxReadPosition());
                }
            }
        } else if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Consumer buffer is full, pause reading", (Object)this.name, (Object)consumer);
        }
    }

    @Override
    protected void reScheduleRead() {
        if (this.isRescheduleReadInProgress.compareAndSet(false, true)) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Reschedule message read in {} ms", new Object[]{this.topic.getName(), this.name, 1000});
            }
            this.topic.getBrokerService().executor().schedule(() -> {
                this.isRescheduleReadInProgress.set(false);
                Consumer currentConsumer = (Consumer)ACTIVE_CONSUMER_UPDATER.get(this);
                this.readMoreEntries(currentConsumer);
            }, 1000L, TimeUnit.MILLISECONDS);
        }
    }

    protected Pair<Integer, Long> calculateToRead(Consumer consumer) {
        int availablePermits = consumer.getAvailablePermits();
        if (!consumer.isWritable()) {
            availablePermits = 1;
        }
        int messagesToRead = Math.min(availablePermits, this.readBatchSize);
        long bytesToRead = this.serviceConfig.getDispatcherMaxReadSizeBytes();
        if (consumer.isPreciseDispatcherFlowControl()) {
            int avgMessagesPerEntry = Math.max(1, consumer.getAvgMessagesPerEntry());
            messagesToRead = Math.min((int)Math.ceil((double)availablePermits * 1.0 / (double)avgMessagesPerEntry), this.readBatchSize);
        }
        if (this.serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !this.cursor.isActive()) {
            Pair<Integer, Long> calculateToRead;
            if (this.topic.getBrokerDispatchRateLimiter().isPresent()) {
                DispatchRateLimiter brokerRateLimiter = this.topic.getBrokerDispatchRateLimiter().get();
                if (this.reachDispatchRateLimit(brokerRateLimiter)) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] message-read exceeded broker message-rate {}/{}, schedule after a {}", new Object[]{this.name, brokerRateLimiter.getDispatchRateOnMsg(), brokerRateLimiter.getDispatchRateOnByte(), 1000});
                    }
                    return Pair.of((Object)-1, (Object)-1L);
                }
                calculateToRead = this.updateMessagesToRead(brokerRateLimiter, messagesToRead, bytesToRead);
                messagesToRead = (Integer)calculateToRead.getLeft();
                bytesToRead = (Long)calculateToRead.getRight();
            }
            if (this.topic.getDispatchRateLimiter().isPresent()) {
                DispatchRateLimiter topicRateLimiter = this.topic.getDispatchRateLimiter().get();
                if (this.reachDispatchRateLimit(topicRateLimiter)) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] message-read exceeded topic message-rate {}/{}, schedule after a {}", new Object[]{this.name, topicRateLimiter.getDispatchRateOnMsg(), topicRateLimiter.getDispatchRateOnByte(), 1000});
                    }
                    return Pair.of((Object)-1, (Object)-1L);
                }
                calculateToRead = this.updateMessagesToRead(topicRateLimiter, messagesToRead, bytesToRead);
                messagesToRead = (Integer)calculateToRead.getLeft();
                bytesToRead = (Long)calculateToRead.getRight();
            }
            if (this.dispatchRateLimiter.isPresent()) {
                if (this.reachDispatchRateLimit(this.dispatchRateLimiter.get())) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] message-read exceeded subscription message-rate {}/{}, schedule after a {}", new Object[]{this.name, this.dispatchRateLimiter.get().getDispatchRateOnMsg(), this.dispatchRateLimiter.get().getDispatchRateOnByte(), 1000});
                    }
                    return Pair.of((Object)-1, (Object)-1L);
                }
                Pair<Integer, Long> calculateToRead2 = this.updateMessagesToRead(this.dispatchRateLimiter.get(), messagesToRead, bytesToRead);
                messagesToRead = (Integer)calculateToRead2.getLeft();
                bytesToRead = (Long)calculateToRead2.getRight();
            }
        }
        messagesToRead = Math.max(messagesToRead, 1);
        bytesToRead = Math.max(bytesToRead, 1L);
        return Pair.of((Object)messagesToRead, (Object)bytesToRead);
    }

    public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
        this.topicExecutor.execute(() -> this.internalReadEntriesFailed(exception, ctx));
    }

    private synchronized void internalReadEntriesFailed(ManagedLedgerException exception, Object ctx) {
        this.havePendingRead = false;
        ReadEntriesCtx readEntriesCtx = (ReadEntriesCtx)ctx;
        Consumer c = readEntriesCtx.getConsumer();
        readEntriesCtx.recycle();
        if (exception instanceof ManagedLedgerException.ConcurrentWaitCallbackException) {
            return;
        }
        long waitTimeMillis = this.readFailureBackoff.next();
        if (exception instanceof ManagedLedgerException.NoMoreEntriesToReadException) {
            if (this.cursor.getNumberOfEntriesInBacklog(false) == 0L) {
                this.checkAndApplyReachedEndOfTopicOrTopicMigration(this.consumers);
            }
        } else if (exception.getCause() instanceof TransactionException.TransactionNotSealedException || exception.getCause() instanceof ManagedLedgerException.OffloadReadHandleClosedException) {
            waitTimeMillis = 1L;
            if (log.isDebugEnabled()) {
                log.debug("[{}] Error reading transaction entries : {}, - Retrying to read in {} seconds", new Object[]{this.name, exception.getMessage(), (double)waitTimeMillis / 1000.0});
            }
        } else if (!(exception instanceof ManagedLedgerException.TooManyRequestsException)) {
            log.error("[{}-{}] Error reading entries at {} : {} - Retrying to read in {} seconds", new Object[]{this.name, c, this.cursor.getReadPosition(), exception.getMessage(), (double)waitTimeMillis / 1000.0});
        } else if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Got throttled by bookies while reading at {} : {} - Retrying to read in {} seconds", new Object[]{this.name, c, this.cursor.getReadPosition(), exception.getMessage(), (double)waitTimeMillis / 1000.0});
        }
        Objects.requireNonNull(c);
        this.readBatchSize = this.serviceConfig.getDispatcherMinReadBatchSize();
        this.topic.getBrokerService().executor().schedule(() -> this.topicExecutor.execute(() -> {
            PersistentDispatcherSingleActiveConsumer persistentDispatcherSingleActiveConsumer = this;
            synchronized (persistentDispatcherSingleActiveConsumer) {
                Consumer currentConsumer = (Consumer)ACTIVE_CONSUMER_UPDATER.get(this);
                if (currentConsumer != null && !this.havePendingRead) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}-{}] Retrying read operation", (Object)this.name, (Object)c);
                    }
                    if (currentConsumer != c) {
                        this.notifyActiveConsumerChanged(currentConsumer);
                    }
                    this.readMoreEntries(currentConsumer);
                } else {
                    log.info("[{}-{}] Skipping read retry: Current Consumer {}, havePendingRead {}", new Object[]{this.name, c, currentConsumer, this.havePendingRead});
                }
            }
        }), waitTimeMillis, TimeUnit.MILLISECONDS);
    }

    @Override
    public void addUnAckedMessages(int unAckMessages) {
    }

    @Override
    public RedeliveryTracker getRedeliveryTracker() {
        return this.redeliveryTracker;
    }

    @Override
    public Optional<DispatchRateLimiter> getRateLimiter() {
        return this.dispatchRateLimiter;
    }

    @Override
    public void updateRateLimiter() {
        if (!this.initializeDispatchRateLimiterIfNeeded()) {
            this.dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate);
        }
    }

    @Override
    public boolean initializeDispatchRateLimiterIfNeeded() {
        if (!this.dispatchRateLimiter.isPresent() && DispatchRateLimiter.isDispatchRateEnabled((DispatchRate)this.topic.getSubscriptionDispatchRate(this.getSubscriptionName()))) {
            this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this.topic, this.getSubscriptionName(), DispatchRateLimiter.Type.SUBSCRIPTION));
            return true;
        }
        return false;
    }

    @Override
    public CompletableFuture<Void> close() {
        IS_CLOSED_UPDATER.set(this, 1);
        this.dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
        return this.disconnectAllConsumers();
    }

    @Override
    public boolean checkAndUnblockIfStuck() {
        Consumer consumer = (Consumer)ACTIVE_CONSUMER_UPDATER.get(this);
        if (consumer == null || this.cursor.checkAndUpdateReadPositionChanged()) {
            return false;
        }
        int totalAvailablePermits = consumer.getAvailablePermits();
        if (totalAvailablePermits > 0 && !this.havePendingRead && this.cursor.getNumberOfEntriesInBacklog(false) > 0L) {
            log.warn("{}-{} Dispatcher is stuck and unblocking by issuing reads", (Object)this.topic.getName(), (Object)this.name);
            this.readMoreEntries(consumer);
            return true;
        }
        return false;
    }

    public static class ReadEntriesCtx {
        private Consumer consumer;
        private long epoch;
        private final Recycler.Handle<ReadEntriesCtx> recyclerHandle;
        private static final Recycler<ReadEntriesCtx> RECYCLER = new Recycler<ReadEntriesCtx>(){

            protected ReadEntriesCtx newObject(Recycler.Handle<ReadEntriesCtx> recyclerHandle) {
                return new ReadEntriesCtx(recyclerHandle);
            }
        };

        private ReadEntriesCtx(Recycler.Handle<ReadEntriesCtx> recyclerHandle) {
            this.recyclerHandle = recyclerHandle;
        }

        public static ReadEntriesCtx create(Consumer consumer, long epoch) {
            ReadEntriesCtx readEntriesCtx = (ReadEntriesCtx)RECYCLER.get();
            readEntriesCtx.consumer = consumer;
            readEntriesCtx.epoch = epoch;
            return readEntriesCtx;
        }

        Consumer getConsumer() {
            return this.consumer;
        }

        long getEpoch() {
            return this.epoch;
        }

        public void recycle() {
            this.consumer = null;
            this.epoch = 0L;
            this.recyclerHandle.recycle((Object)this);
        }
    }
}

