/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.SendCallback;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentReplicator
extends AbstractReplicator
implements Replicator,
AsyncCallbacks.ReadEntriesCallback,
AsyncCallbacks.DeleteCallback {
    private final PersistentTopic topic;
    private final ManagedCursor cursor;
    private final int producerQueueSize;
    private static final int MaxReadBatchSize = 100;
    private int readBatchSize;
    private final int producerQueueThreshold;
    private static final AtomicIntegerFieldUpdater<PersistentReplicator> PENDING_MESSAGES_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentReplicator.class, "pendingMessages");
    private volatile int pendingMessages = 0;
    private static final int FALSE = 0;
    private static final int TRUE = 1;
    private static final AtomicIntegerFieldUpdater<PersistentReplicator> HAVE_PENDING_READ_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentReplicator.class, "havePendingRead");
    private volatile int havePendingRead = 0;
    private final Rate msgOut = new Rate();
    private final Rate msgExpired = new Rate();
    private int messageTTLInSeconds = 0;
    private final Backoff readFailureBackoff = new Backoff(1L, TimeUnit.SECONDS, 1L, TimeUnit.MINUTES);
    private PersistentMessageExpiryMonitor expiryMonitor;
    private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;
    private final ReplicatorStats stats = new ReplicatorStats();
    private static final Logger log = LoggerFactory.getLogger(PersistentReplicator.class);

    public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster, BrokerService brokerService) {
        super(topic.getName(), topic.replicatorPrefix, localCluster, remoteCluster, brokerService);
        this.topic = topic;
        this.cursor = cursor;
        this.expiryMonitor = new PersistentMessageExpiryMonitor(this.topicName, Codec.decode((String)cursor.getName()), cursor);
        HAVE_PENDING_READ_UPDATER.set(this, 0);
        PENDING_MESSAGES_UPDATER.set(this, 0);
        this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
        this.readBatchSize = Math.min(this.producerQueueSize, 100);
        this.producerQueueThreshold = (int)((double)this.producerQueueSize * 0.9);
        this.startProducer();
    }

    @Override
    protected void readEntries(Producer producer) {
        this.cursor.rewind();
        this.cursor.cancelPendingReadRequest();
        HAVE_PENDING_READ_UPDATER.set(this, 0);
        this.producer = (ProducerImpl)producer;
        if (AbstractReplicator.STATE_UPDATER.compareAndSet(this, AbstractReplicator.State.Starting, AbstractReplicator.State.Started)) {
            log.info("[{}][{} -> {}] Created replicator producer", new Object[]{this.topicName, this.localCluster, this.remoteCluster});
            this.backOff.reset();
            this.cursor.setActive();
            this.readMoreEntries();
        } else {
            log.info("[{}][{} -> {}] Replicator was stopped while creating the producer. Closing it. Replicator state: {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, AbstractReplicator.STATE_UPDATER.get(this)});
            AbstractReplicator.STATE_UPDATER.set(this, AbstractReplicator.State.Stopping);
            this.closeProducerAsync();
        }
    }

    @Override
    protected Position getReplicatorReadPosition() {
        return this.cursor.getMarkDeletedPosition();
    }

    @Override
    protected long getNumberOfEntriesInBacklog() {
        return this.cursor.getNumberOfEntriesInBacklog();
    }

    @Override
    protected void disableReplicatorRead() {
        this.cursor.setInactive();
    }

    protected void readMoreEntries() {
        int availablePermits = this.producerQueueSize - PENDING_MESSAGES_UPDATER.get(this);
        if (availablePermits > 0) {
            int messagesToRead = Math.min(availablePermits, this.readBatchSize);
            if (!this.isWritable()) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{} -> {}] Throttling replication traffic because producer is not writable", new Object[]{this.topicName, this.localCluster, this.remoteCluster});
                }
                messagesToRead = 1;
            }
            if (HAVE_PENDING_READ_UPDATER.compareAndSet(this, 0, 1)) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{} -> {}] Schedule read of {} messages", new Object[]{this.topicName, this.localCluster, this.remoteCluster, messagesToRead});
                }
                this.cursor.asyncReadEntriesOrWait(messagesToRead, (AsyncCallbacks.ReadEntriesCallback)this, null);
            } else if (log.isDebugEnabled()) {
                log.debug("[{}][{} -> {}] Not scheduling read due to pending read. Messages To Read {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, messagesToRead});
            }
        } else if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Producer queue is full, pause reading", new Object[]{this.topicName, this.localCluster, this.remoteCluster});
        }
    }

    public void readEntriesComplete(List<Entry> entries, Object ctx) {
        if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Read entries complete of {} messages", new Object[]{this.topicName, this.localCluster, this.remoteCluster, entries.size()});
        }
        if (this.readBatchSize < 100) {
            int newReadBatchSize = Math.min(this.readBatchSize * 2, 100);
            if (log.isDebugEnabled()) {
                log.debug("[{}][{} -> {}] Increasing read batch size from {} to {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, this.readBatchSize, newReadBatchSize});
            }
            this.readBatchSize = newReadBatchSize;
        }
        this.readFailureBackoff.reduceToHalf();
        boolean atLeastOneMessageSentForReplication = false;
        try {
            boolean isLocalMessageSkippedOnce = false;
            int i = 0;
            while (i < entries.size()) {
                block22: {
                    MessageImpl msg;
                    Entry entry = entries.get(i);
                    int length = entry.getLength();
                    ByteBuf headersAndPayload = entry.getDataBuffer();
                    try {
                        msg = MessageImpl.deserialize((ByteBuf)headersAndPayload);
                    }
                    catch (Throwable t) {
                        log.error("[{}][{} -> {}] Failed to deserialize message at {} (buffer size: {}): {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, entry.getPosition(), length, t.getMessage(), t});
                        this.cursor.asyncDelete(entry.getPosition(), (AsyncCallbacks.DeleteCallback)this, (Object)entry.getPosition());
                        entry.release();
                        break block22;
                    }
                    if (msg.isReplicated()) {
                        this.cursor.asyncDelete(entry.getPosition(), (AsyncCallbacks.DeleteCallback)this, (Object)entry.getPosition());
                        entry.release();
                        msg.recycle();
                    } else if (msg.hasReplicateTo() && !msg.getReplicateTo().contains(this.remoteCluster)) {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}][{} -> {}] Skipping message at {} / msg-id: {}: replicateTo {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, entry.getPosition(), msg.getMessageId(), msg.getReplicateTo()});
                        }
                        this.cursor.asyncDelete(entry.getPosition(), (AsyncCallbacks.DeleteCallback)this, (Object)entry.getPosition());
                        entry.release();
                        msg.recycle();
                    } else if (msg.isExpired(this.messageTTLInSeconds)) {
                        this.msgExpired.recordEvent(0L);
                        if (log.isDebugEnabled()) {
                            log.debug("[{}][{} -> {}] Discarding expired message at {} / msg-id: {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, entry.getPosition(), msg.getMessageId()});
                        }
                        this.cursor.asyncDelete(entry.getPosition(), (AsyncCallbacks.DeleteCallback)this, (Object)entry.getPosition());
                        entry.release();
                        msg.recycle();
                    } else if (AbstractReplicator.STATE_UPDATER.get(this) != AbstractReplicator.State.Started || isLocalMessageSkippedOnce) {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}][{} -> {}] Dropping read message at {} because producer is not ready", new Object[]{this.topicName, this.localCluster, this.remoteCluster, entry.getPosition()});
                        }
                        isLocalMessageSkippedOnce = true;
                        entry.release();
                        msg.recycle();
                    } else {
                        PENDING_MESSAGES_UPDATER.incrementAndGet(this);
                        this.msgOut.recordEvent((long)headersAndPayload.readableBytes());
                        msg.setReplicatedFrom(this.localCluster);
                        headersAndPayload.retain();
                        this.producer.sendAsync((Message)msg, (SendCallback)ProducerSendCallback.create(this, entry, msg));
                        atLeastOneMessageSentForReplication = true;
                    }
                }
                ++i;
            }
        }
        catch (Exception e) {
            log.error("[{}][{} -> {}] Unexpected exception: {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, e.getMessage(), e});
        }
        HAVE_PENDING_READ_UPDATER.set(this, 0);
        if (atLeastOneMessageSentForReplication && !this.isWritable()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}][{} -> {}] Pausing replication traffic. at-least-one: {} is-writable: {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, atLeastOneMessageSentForReplication, this.isWritable()});
            }
        } else {
            this.readMoreEntries();
        }
    }

    public void updateCursorState() {
        if (this.producer != null && this.producer.isConnected()) {
            this.cursor.setActive();
        } else {
            this.cursor.setInactive();
        }
    }

    public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
        if (AbstractReplicator.STATE_UPDATER.get(this) != AbstractReplicator.State.Started) {
            log.info("[{}][{} -> {}] Replicator was stopped while reading entries. Stop reading. Replicator state: {}", new Object[]{this.topic, this.localCluster, this.remoteCluster, AbstractReplicator.STATE_UPDATER.get(this)});
            return;
        }
        this.readBatchSize = 1;
        long waitTimeMillis = this.readFailureBackoff.next();
        if (exception instanceof ManagedLedgerException.CursorAlreadyClosedException) {
            log.error("[{}][{} -> {}] Error reading entries because replicator is already deleted and cursor is already closed {}, ({})", new Object[]{this.topic, this.localCluster, this.remoteCluster, ctx, exception.getMessage(), exception});
            this.closeProducerAsync();
            return;
        }
        if (!(exception instanceof ManagedLedgerException.TooManyRequestsException)) {
            log.error("[{}][{} -> {}] Error reading entries at {}. Retrying to read in {}s. ({})", new Object[]{this.topic, this.localCluster, this.remoteCluster, ctx, (double)waitTimeMillis / 1000.0, exception.getMessage(), exception});
        } else if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Throttled by bookies while reading at {}. Retrying to read in {}s. ({})", new Object[]{this.topic, this.localCluster, this.remoteCluster, ctx, (double)waitTimeMillis / 1000.0, exception.getMessage(), exception});
        }
        HAVE_PENDING_READ_UPDATER.set(this, 0);
        this.brokerService.executor().schedule(this::readMoreEntries, waitTimeMillis, TimeUnit.MILLISECONDS);
    }

    public CompletableFuture<Void> clearBacklog() {
        final CompletableFuture<Void> future = new CompletableFuture<Void>();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Backlog size before clearing: {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, this.cursor.getNumberOfEntriesInBacklog()});
        }
        this.cursor.asyncClearBacklog(new AsyncCallbacks.ClearBacklogCallback(){

            public void clearBacklogComplete(Object ctx) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{} -> {}] Backlog size after clearing: {}", new Object[]{PersistentReplicator.this.topicName, PersistentReplicator.this.localCluster, PersistentReplicator.this.remoteCluster, PersistentReplicator.this.cursor.getNumberOfEntriesInBacklog()});
                }
                future.complete(null);
            }

            public void clearBacklogFailed(ManagedLedgerException exception, Object ctx) {
                log.error("[{}][{} -> {}] Failed to clear backlog", new Object[]{PersistentReplicator.this.topicName, PersistentReplicator.this.localCluster, PersistentReplicator.this.remoteCluster, exception});
                future.completeExceptionally((Throwable)exception);
            }
        }, null);
        return future;
    }

    public CompletableFuture<Void> skipMessages(final int numMessagesToSkip) {
        final CompletableFuture<Void> future = new CompletableFuture<Void>();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Skipping {} messages, current backlog {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, numMessagesToSkip, this.cursor.getNumberOfEntriesInBacklog()});
        }
        this.cursor.asyncSkipEntries(numMessagesToSkip, ManagedCursor.IndividualDeletedEntries.Exclude, new AsyncCallbacks.SkipEntriesCallback(){

            public void skipEntriesComplete(Object ctx) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{} -> {}] Skipped {} messages, new backlog {}", new Object[]{PersistentReplicator.this.topicName, PersistentReplicator.this.localCluster, PersistentReplicator.this.remoteCluster, numMessagesToSkip, PersistentReplicator.this.cursor.getNumberOfEntriesInBacklog()});
                }
                future.complete(null);
            }

            public void skipEntriesFailed(ManagedLedgerException exception, Object ctx) {
                log.error("[{}][{} -> {}] Failed to skip {} messages", new Object[]{PersistentReplicator.this.topicName, PersistentReplicator.this.localCluster, PersistentReplicator.this.remoteCluster, numMessagesToSkip, exception});
                future.completeExceptionally((Throwable)exception);
            }
        }, null);
        return future;
    }

    public CompletableFuture<Entry> peekNthMessage(int messagePosition) {
        final CompletableFuture<Entry> future = new CompletableFuture<Entry>();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Getting message at position {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, messagePosition});
        }
        this.cursor.asyncGetNthEntry(messagePosition, ManagedCursor.IndividualDeletedEntries.Exclude, new AsyncCallbacks.ReadEntryCallback(){

            public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
                future.completeExceptionally((Throwable)exception);
            }

            public void readEntryComplete(Entry entry, Object ctx) {
                future.complete(entry);
            }
        }, null);
        return future;
    }

    public void deleteComplete(Object ctx) {
        if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Deleted message at {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, ctx});
        }
    }

    public void deleteFailed(ManagedLedgerException exception, Object ctx) {
        log.error("[{}][{} -> {}] Failed to delete message at {}: {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, ctx, exception.getMessage(), exception});
    }

    @Override
    public void updateRates() {
        this.msgOut.calculateRate();
        this.msgExpired.calculateRate();
        this.stats.msgRateOut = this.msgOut.getRate();
        this.stats.msgThroughputOut = this.msgOut.getValueRate();
        this.stats.msgRateExpired = this.msgExpired.getRate() + this.expiryMonitor.getMessageExpiryRate();
    }

    @Override
    public ReplicatorStats getStats() {
        this.stats.replicationBacklog = this.cursor.getNumberOfEntriesInBacklog();
        this.stats.connected = this.producer != null && this.producer.isConnected();
        this.stats.replicationDelayInSeconds = this.getReplicationDelayInSeconds();
        ProducerImpl producer = this.producer;
        if (producer != null) {
            this.stats.outboundConnection = producer.getConnectionId();
            this.stats.outboundConnectedSince = producer.getConnectedSince();
        } else {
            this.stats.outboundConnection = null;
            this.stats.outboundConnectedSince = null;
        }
        return this.stats;
    }

    public void updateMessageTTL(int messageTTLInSeconds) {
        this.messageTTLInSeconds = messageTTLInSeconds;
    }

    private long getReplicationDelayInSeconds() {
        if (this.producer != null) {
            return TimeUnit.MILLISECONDS.toSeconds(this.producer.getDelayInMillis());
        }
        return 0L;
    }

    public void expireMessages(int messageTTLInSeconds) {
        if (this.cursor.getNumberOfEntriesInBacklog() == 0L || this.cursor.getNumberOfEntriesInBacklog() < 1000L && !this.topic.isOldestMessageExpired(this.cursor, messageTTLInSeconds)) {
            return;
        }
        this.expiryMonitor.expireMessages(messageTTLInSeconds);
    }

    private static final class ProducerSendCallback
    implements SendCallback {
        private PersistentReplicator replicator;
        private Entry entry;
        private MessageImpl msg;
        private final Recycler.Handle recyclerHandle;
        private static final Recycler<ProducerSendCallback> RECYCLER = new Recycler<ProducerSendCallback>(){

            protected ProducerSendCallback newObject(Recycler.Handle handle) {
                return new ProducerSendCallback(handle);
            }
        };

        public void sendComplete(Exception exception) {
            if (exception != null) {
                log.error("[{}][{} -> {}] Error producing on remote broker", new Object[]{this.replicator.topicName, this.replicator.localCluster, this.replicator.remoteCluster, exception});
                this.replicator.cursor.rewind();
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{} -> {}] Message persisted on remote broker", new Object[]{this.replicator.topicName, this.replicator.localCluster, this.replicator.remoteCluster});
                }
                this.replicator.cursor.asyncDelete(this.entry.getPosition(), (AsyncCallbacks.DeleteCallback)this.replicator, (Object)this.entry.getPosition());
            }
            this.entry.release();
            int pending = PENDING_MESSAGES_UPDATER.decrementAndGet(this.replicator);
            if (pending < this.replicator.producerQueueThreshold && HAVE_PENDING_READ_UPDATER.get(this.replicator) == 0) {
                if (pending == 0 || this.replicator.producer.isWritable()) {
                    this.replicator.readMoreEntries();
                } else if (log.isDebugEnabled()) {
                    log.debug("[{}][{} -> {}] Not resuming reads. pending: {} is-writable: {}", new Object[]{this.replicator.topicName, this.replicator.localCluster, this.replicator.remoteCluster, pending, this.replicator.producer.isWritable()});
                }
            }
            this.recycle();
        }

        private ProducerSendCallback(Recycler.Handle recyclerHandle) {
            this.recyclerHandle = recyclerHandle;
        }

        static ProducerSendCallback create(PersistentReplicator replicator, Entry entry, MessageImpl msg) {
            ProducerSendCallback sendCallback = (ProducerSendCallback)RECYCLER.get();
            sendCallback.replicator = replicator;
            sendCallback.entry = entry;
            sendCallback.msg = msg;
            return sendCallback;
        }

        private void recycle() {
            this.replicator = null;
            this.entry = null;
            if (this.msg != null) {
                this.msg.recycle();
                this.msg = null;
            }
            RECYCLER.recycle((Object)this, this.recyclerHandle);
        }

        public void addCallback(SendCallback scb) {
        }

        public SendCallback getNextSendCallback() {
            return null;
        }

        public CompletableFuture<MessageId> getFuture() {
            return null;
        }
    }
}

