package org.apache.pulsar.broker.service.persistent;

import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.SendCallback;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentReplicator.class */
public class PersistentReplicator extends AbstractReplicator implements Replicator, AsyncCallbacks.ReadEntriesCallback, AsyncCallbacks.DeleteCallback {
    private final PersistentTopic topic;
    private final ManagedCursor cursor;
    private static final int MaxReadBatchSize = 100;
    private int readBatchSize;
    private final int producerQueueThreshold;
    private volatile int pendingMessages;
    private static final int FALSE = 0;
    private static final int TRUE = 1;
    private volatile int havePendingRead;
    private final Rate msgOut;
    private final Rate msgExpired;
    private int messageTTLInSeconds;
    private final Backoff readFailureBackoff;
    private PersistentMessageExpiryMonitor expiryMonitor;
    private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;
    private final ReplicatorStats stats;
    private static final AtomicIntegerFieldUpdater<PersistentReplicator> PENDING_MESSAGES_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentReplicator.class, "pendingMessages");
    private static final AtomicIntegerFieldUpdater<PersistentReplicator> HAVE_PENDING_READ_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentReplicator.class, "havePendingRead");
    private static final Logger log = LoggerFactory.getLogger(PersistentReplicator.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentReplicator$ProducerSendCallback.class */
    public static final class ProducerSendCallback implements SendCallback {
        private PersistentReplicator replicator;
        private Entry entry;
        private MessageImpl msg;
        private final Recycler.Handle<ProducerSendCallback> recyclerHandle;
        private static final Recycler<ProducerSendCallback> RECYCLER = new Recycler<ProducerSendCallback>() { // from class: org.apache.pulsar.broker.service.persistent.PersistentReplicator.ProducerSendCallback.1
            protected ProducerSendCallback newObject(Recycler.Handle<ProducerSendCallback> handle) {
                return new ProducerSendCallback(handle, null);
            }

            /* renamed from: newObject, reason: collision with other method in class */
            protected /* bridge */ /* synthetic */ Object m102newObject(Recycler.Handle handle) {
                return newObject((Recycler.Handle<ProducerSendCallback>) handle);
            }
        };

        public void sendComplete(Exception exc) {
            if (exc != null) {
                PersistentReplicator.log.error("[{}][{} -> {}] Error producing on remote broker", new Object[]{this.replicator.topicName, this.replicator.localCluster, this.replicator.remoteCluster, exc});
                this.replicator.cursor.rewind();
            } else {
                if (PersistentReplicator.log.isDebugEnabled()) {
                    PersistentReplicator.log.debug("[{}][{} -> {}] Message persisted on remote broker", new Object[]{this.replicator.topicName, this.replicator.localCluster, this.replicator.remoteCluster});
                }
                this.replicator.cursor.asyncDelete(this.entry.getPosition(), this.replicator, this.entry.getPosition());
            }
            this.entry.release();
            int decrementAndGet = PersistentReplicator.PENDING_MESSAGES_UPDATER.decrementAndGet(this.replicator);
            if (decrementAndGet < this.replicator.producerQueueThreshold && PersistentReplicator.HAVE_PENDING_READ_UPDATER.get(this.replicator) == 0) {
                if (decrementAndGet == 0 || this.replicator.producer.isWritable()) {
                    this.replicator.readMoreEntries();
                } else if (PersistentReplicator.log.isDebugEnabled()) {
                    PersistentReplicator.log.debug("[{}][{} -> {}] Not resuming reads. pending: {} is-writable: {}", new Object[]{this.replicator.topicName, this.replicator.localCluster, this.replicator.remoteCluster, Integer.valueOf(decrementAndGet), Boolean.valueOf(this.replicator.producer.isWritable())});
                }
            }
            recycle();
        }

        private ProducerSendCallback(Recycler.Handle<ProducerSendCallback> handle) {
            this.recyclerHandle = handle;
        }

        static ProducerSendCallback create(PersistentReplicator persistentReplicator, Entry entry, MessageImpl messageImpl) {
            ProducerSendCallback producerSendCallback = (ProducerSendCallback) RECYCLER.get();
            producerSendCallback.replicator = persistentReplicator;
            producerSendCallback.entry = entry;
            producerSendCallback.msg = messageImpl;
            return producerSendCallback;
        }

        private void recycle() {
            this.replicator = null;
            this.entry = null;
            if (this.msg != null) {
                this.msg.recycle();
                this.msg = null;
            }
            this.recyclerHandle.recycle(this);
        }

        public void addCallback(SendCallback sendCallback) {
        }

        public SendCallback getNextSendCallback() {
            return null;
        }

        public CompletableFuture<MessageId> getFuture() {
            return null;
        }

        /* synthetic */ ProducerSendCallback(Recycler.Handle handle, ProducerSendCallback producerSendCallback) {
            this(handle);
        }
    }

    public PersistentReplicator(PersistentTopic persistentTopic, ManagedCursor managedCursor, String str, String str2, BrokerService brokerService) {
        super(persistentTopic.getName(), persistentTopic.replicatorPrefix, str, str2, brokerService);
        this.pendingMessages = 0;
        this.havePendingRead = 0;
        this.msgOut = new Rate();
        this.msgExpired = new Rate();
        this.messageTTLInSeconds = 0;
        this.readFailureBackoff = new Backoff(1L, TimeUnit.SECONDS, 1L, TimeUnit.MINUTES, 0L, TimeUnit.MILLISECONDS);
        this.stats = new ReplicatorStats();
        this.topic = persistentTopic;
        this.cursor = managedCursor;
        this.expiryMonitor = new PersistentMessageExpiryMonitor(this.topicName, Codec.decode(managedCursor.getName()), managedCursor);
        HAVE_PENDING_READ_UPDATER.set(this, 0);
        PENDING_MESSAGES_UPDATER.set(this, 0);
        this.readBatchSize = Math.min(this.producerQueueSize, 100);
        this.producerQueueThreshold = (int) (this.producerQueueSize * 0.9d);
        startProducer();
    }

    @Override // org.apache.pulsar.broker.service.AbstractReplicator
    protected void readEntries(Producer producer) {
        this.cursor.rewind();
        this.cursor.cancelPendingReadRequest();
        HAVE_PENDING_READ_UPDATER.set(this, 0);
        this.producer = (ProducerImpl) producer;
        if (!AbstractReplicator.STATE_UPDATER.compareAndSet(this, AbstractReplicator.State.Starting, AbstractReplicator.State.Started)) {
            log.info("[{}][{} -> {}] Replicator was stopped while creating the producer. Closing it. Replicator state: {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, AbstractReplicator.STATE_UPDATER.get(this)});
            AbstractReplicator.STATE_UPDATER.set(this, AbstractReplicator.State.Stopping);
            closeProducerAsync();
        } else {
            log.info("[{}][{} -> {}] Created replicator producer", new Object[]{this.topicName, this.localCluster, this.remoteCluster});
            this.backOff.reset();
            this.cursor.setActive();
            readMoreEntries();
        }
    }

    @Override // org.apache.pulsar.broker.service.AbstractReplicator
    protected Position getReplicatorReadPosition() {
        return this.cursor.getMarkDeletedPosition();
    }

    @Override // org.apache.pulsar.broker.service.AbstractReplicator
    protected long getNumberOfEntriesInBacklog() {
        return this.cursor.getNumberOfEntriesInBacklog();
    }

    @Override // org.apache.pulsar.broker.service.AbstractReplicator
    protected void disableReplicatorRead() {
        this.cursor.setInactive();
    }

    protected void readMoreEntries() {
        int i = this.producerQueueSize - PENDING_MESSAGES_UPDATER.get(this);
        if (i <= 0) {
            if (log.isDebugEnabled()) {
                log.debug("[{}][{} -> {}] Producer queue is full, pause reading", new Object[]{this.topicName, this.localCluster, this.remoteCluster});
                return;
            }
            return;
        }
        int min = Math.min(i, this.readBatchSize);
        if (!isWritable()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}][{} -> {}] Throttling replication traffic because producer is not writable", new Object[]{this.topicName, this.localCluster, this.remoteCluster});
            }
            min = 1;
        }
        if (HAVE_PENDING_READ_UPDATER.compareAndSet(this, 0, 1)) {
            if (log.isDebugEnabled()) {
                log.debug("[{}][{} -> {}] Schedule read of {} messages", new Object[]{this.topicName, this.localCluster, this.remoteCluster, Integer.valueOf(min)});
            }
            this.cursor.asyncReadEntriesOrWait(min, this, (Object) null);
        } else if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Not scheduling read due to pending read. Messages To Read {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, Integer.valueOf(min)});
        }
    }

    public void readEntriesComplete(List<Entry> list, Object obj) {
        if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Read entries complete of {} messages", new Object[]{this.topicName, this.localCluster, this.remoteCluster, Integer.valueOf(list.size())});
        }
        if (this.readBatchSize < 100) {
            int min = Math.min(this.readBatchSize * 2, 100);
            if (log.isDebugEnabled()) {
                log.debug("[{}][{} -> {}] Increasing read batch size from {} to {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, Integer.valueOf(this.readBatchSize), Integer.valueOf(min)});
            }
            this.readBatchSize = min;
        }
        this.readFailureBackoff.reduceToHalf();
        boolean z = false;
        boolean z2 = false;
        for (int i = 0; i < list.size(); i++) {
            try {
                Entry entry = list.get(i);
                int length = entry.getLength();
                ByteBuf dataBuffer = entry.getDataBuffer();
                try {
                    MessageImpl deserialize = MessageImpl.deserialize(dataBuffer);
                    if (deserialize.isReplicated()) {
                        this.cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
                        entry.release();
                        deserialize.recycle();
                    } else if (deserialize.hasReplicateTo() && !deserialize.getReplicateTo().contains(this.remoteCluster)) {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}][{} -> {}] Skipping message at {} / msg-id: {}: replicateTo {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, entry.getPosition(), deserialize.getMessageId(), deserialize.getReplicateTo()});
                        }
                        this.cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
                        entry.release();
                        deserialize.recycle();
                    } else if (deserialize.isExpired(this.messageTTLInSeconds)) {
                        this.msgExpired.recordEvent(0L);
                        if (log.isDebugEnabled()) {
                            log.debug("[{}][{} -> {}] Discarding expired message at {} / msg-id: {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, entry.getPosition(), deserialize.getMessageId()});
                        }
                        this.cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
                        entry.release();
                        deserialize.recycle();
                    } else if (AbstractReplicator.STATE_UPDATER.get(this) != AbstractReplicator.State.Started || z2) {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}][{} -> {}] Dropping read message at {} because producer is not ready", new Object[]{this.topicName, this.localCluster, this.remoteCluster, entry.getPosition()});
                        }
                        z2 = true;
                        entry.release();
                        deserialize.recycle();
                    } else {
                        PENDING_MESSAGES_UPDATER.incrementAndGet(this);
                        this.msgOut.recordEvent(dataBuffer.readableBytes());
                        deserialize.setReplicatedFrom(this.localCluster);
                        dataBuffer.retain();
                        this.producer.sendAsync(deserialize, ProducerSendCallback.create(this, entry, deserialize));
                        z = true;
                    }
                } catch (Throwable th) {
                    log.error("[{}][{} -> {}] Failed to deserialize message at {} (buffer size: {}): {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, entry.getPosition(), Integer.valueOf(length), th.getMessage(), th});
                    this.cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
                    entry.release();
                }
            } catch (Exception e) {
                log.error("[{}][{} -> {}] Unexpected exception: {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, e.getMessage(), e});
            }
        }
        HAVE_PENDING_READ_UPDATER.set(this, 0);
        if (!z || isWritable()) {
            readMoreEntries();
        } else if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Pausing replication traffic. at-least-one: {} is-writable: {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, Boolean.valueOf(z), Boolean.valueOf(isWritable())});
        }
    }

    public void updateCursorState() {
        if (this.producer == null || !this.producer.isConnected()) {
            this.cursor.setInactive();
        } else {
            this.cursor.setActive();
        }
    }

    public void readEntriesFailed(ManagedLedgerException managedLedgerException, Object obj) {
        if (AbstractReplicator.STATE_UPDATER.get(this) != AbstractReplicator.State.Started) {
            log.info("[{}][{} -> {}] Replicator was stopped while reading entries. Stop reading. Replicator state: {}", new Object[]{this.topic, this.localCluster, this.remoteCluster, AbstractReplicator.STATE_UPDATER.get(this)});
            return;
        }
        this.readBatchSize = 1;
        long next = this.readFailureBackoff.next();
        if (managedLedgerException instanceof ManagedLedgerException.CursorAlreadyClosedException) {
            log.error("[{}][{} -> {}] Error reading entries because replicator is already deleted and cursor is already closed {}, ({})", new Object[]{this.topic, this.localCluster, this.remoteCluster, obj, managedLedgerException.getMessage(), managedLedgerException});
            closeProducerAsync();
            return;
        }
        if (!(managedLedgerException instanceof ManagedLedgerException.TooManyRequestsException)) {
            log.error("[{}][{} -> {}] Error reading entries at {}. Retrying to read in {}s. ({})", new Object[]{this.topic, this.localCluster, this.remoteCluster, obj, Double.valueOf(next / 1000.0d), managedLedgerException.getMessage(), managedLedgerException});
        } else if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Throttled by bookies while reading at {}. Retrying to read in {}s. ({})", new Object[]{this.topicName, this.localCluster, this.remoteCluster, obj, Double.valueOf(next / 1000.0d), managedLedgerException.getMessage(), managedLedgerException});
        }
        HAVE_PENDING_READ_UPDATER.set(this, 0);
        this.brokerService.executor().schedule(this::readMoreEntries, next, TimeUnit.MILLISECONDS);
    }

    public CompletableFuture<Void> clearBacklog() {
        final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Backlog size before clearing: {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, Long.valueOf(this.cursor.getNumberOfEntriesInBacklog())});
        }
        this.cursor.asyncClearBacklog(new AsyncCallbacks.ClearBacklogCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentReplicator.1
            public void clearBacklogComplete(Object obj) {
                if (PersistentReplicator.log.isDebugEnabled()) {
                    PersistentReplicator.log.debug("[{}][{} -> {}] Backlog size after clearing: {}", new Object[]{PersistentReplicator.this.topicName, PersistentReplicator.this.localCluster, PersistentReplicator.this.remoteCluster, Long.valueOf(PersistentReplicator.this.cursor.getNumberOfEntriesInBacklog())});
                }
                completableFuture.complete(null);
            }

            public void clearBacklogFailed(ManagedLedgerException managedLedgerException, Object obj) {
                PersistentReplicator.log.error("[{}][{} -> {}] Failed to clear backlog", new Object[]{PersistentReplicator.this.topicName, PersistentReplicator.this.localCluster, PersistentReplicator.this.remoteCluster, managedLedgerException});
                completableFuture.completeExceptionally(managedLedgerException);
            }
        }, (Object) null);
        return completableFuture;
    }

    public CompletableFuture<Void> skipMessages(final int i) {
        final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Skipping {} messages, current backlog {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, Integer.valueOf(i), Long.valueOf(this.cursor.getNumberOfEntriesInBacklog())});
        }
        this.cursor.asyncSkipEntries(i, ManagedCursor.IndividualDeletedEntries.Exclude, new AsyncCallbacks.SkipEntriesCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentReplicator.2
            public void skipEntriesComplete(Object obj) {
                if (PersistentReplicator.log.isDebugEnabled()) {
                    PersistentReplicator.log.debug("[{}][{} -> {}] Skipped {} messages, new backlog {}", new Object[]{PersistentReplicator.this.topicName, PersistentReplicator.this.localCluster, PersistentReplicator.this.remoteCluster, Integer.valueOf(i), Long.valueOf(PersistentReplicator.this.cursor.getNumberOfEntriesInBacklog())});
                }
                completableFuture.complete(null);
            }

            public void skipEntriesFailed(ManagedLedgerException managedLedgerException, Object obj) {
                PersistentReplicator.log.error("[{}][{} -> {}] Failed to skip {} messages", new Object[]{PersistentReplicator.this.topicName, PersistentReplicator.this.localCluster, PersistentReplicator.this.remoteCluster, Integer.valueOf(i), managedLedgerException});
                completableFuture.completeExceptionally(managedLedgerException);
            }
        }, (Object) null);
        return completableFuture;
    }

    public CompletableFuture<Entry> peekNthMessage(int i) {
        final CompletableFuture<Entry> completableFuture = new CompletableFuture<>();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Getting message at position {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, Integer.valueOf(i)});
        }
        this.cursor.asyncGetNthEntry(i, ManagedCursor.IndividualDeletedEntries.Exclude, new AsyncCallbacks.ReadEntryCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentReplicator.3
            public void readEntryFailed(ManagedLedgerException managedLedgerException, Object obj) {
                completableFuture.completeExceptionally(managedLedgerException);
            }

            public void readEntryComplete(Entry entry, Object obj) {
                completableFuture.complete(entry);
            }
        }, (Object) null);
        return completableFuture;
    }

    public void deleteComplete(Object obj) {
        if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Deleted message at {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, obj});
        }
    }

    public void deleteFailed(ManagedLedgerException managedLedgerException, Object obj) {
        log.error("[{}][{} -> {}] Failed to delete message at {}: {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, obj, managedLedgerException.getMessage(), managedLedgerException});
    }

    @Override // org.apache.pulsar.broker.service.Replicator
    public void updateRates() {
        this.msgOut.calculateRate();
        this.msgExpired.calculateRate();
        this.stats.msgRateOut = this.msgOut.getRate();
        this.stats.msgThroughputOut = this.msgOut.getValueRate();
        this.stats.msgRateExpired = this.msgExpired.getRate() + this.expiryMonitor.getMessageExpiryRate();
    }

    @Override // org.apache.pulsar.broker.service.Replicator
    /* renamed from: getStats */
    public ReplicatorStats mo83getStats() {
        this.stats.replicationBacklog = this.cursor.getNumberOfEntriesInBacklog();
        this.stats.connected = this.producer != null && this.producer.isConnected();
        this.stats.replicationDelayInSeconds = getReplicationDelayInSeconds();
        ProducerImpl producerImpl = this.producer;
        if (producerImpl != null) {
            this.stats.outboundConnection = producerImpl.getConnectionId();
            this.stats.outboundConnectedSince = producerImpl.getConnectedSince();
        } else {
            this.stats.outboundConnection = null;
            this.stats.outboundConnectedSince = null;
        }
        return this.stats;
    }

    public void updateMessageTTL(int i) {
        this.messageTTLInSeconds = i;
    }

    private long getReplicationDelayInSeconds() {
        if (this.producer != null) {
            return TimeUnit.MILLISECONDS.toSeconds(this.producer.getDelayInMillis());
        }
        return 0L;
    }

    public void expireMessages(int i) {
        if (this.cursor.getNumberOfEntriesInBacklog() != 0) {
            if (this.cursor.getNumberOfEntriesInBacklog() >= 1000 || this.topic.isOldestMessageExpired(this.cursor, i)) {
                this.expiryMonitor.expireMessages(i);
            }
        }
    }
}
