/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractReplicator
implements Replicator {
    protected final BrokerService brokerService;
    protected final String localTopicName;
    protected final String localCluster;
    protected final String remoteTopicName;
    protected final String remoteCluster;
    protected final PulsarClientImpl replicationClient;
    protected final PulsarClientImpl client;
    protected String replicatorId;
    protected final Topic localTopic;
    protected volatile ProducerImpl producer;
    public static final String REPL_PRODUCER_NAME_DELIMITER = "-->";
    protected final int producerQueueSize;
    protected final ProducerBuilder<byte[]> producerBuilder;
    protected final Backoff backOff = new Backoff(100L, TimeUnit.MILLISECONDS, 1L, TimeUnit.MINUTES, 0L, TimeUnit.MILLISECONDS);
    protected final String replicatorPrefix;
    protected static final AtomicReferenceFieldUpdater<AbstractReplicator, State> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractReplicator.class, State.class, "state");
    @VisibleForTesting
    protected volatile State state = State.Disconnected;
    private static final Logger log = LoggerFactory.getLogger(AbstractReplicator.class);

    public AbstractReplicator(String localCluster, Topic localTopic, String remoteCluster, String remoteTopicName, String replicatorPrefix, BrokerService brokerService, PulsarClientImpl replicationClient) throws PulsarServerException {
        this.brokerService = brokerService;
        this.localTopic = localTopic;
        this.localTopicName = localTopic.getName();
        this.replicatorPrefix = replicatorPrefix;
        this.localCluster = localCluster.intern();
        this.remoteTopicName = remoteTopicName;
        this.remoteCluster = remoteCluster.intern();
        this.replicationClient = replicationClient;
        this.client = (PulsarClientImpl)brokerService.pulsar().getClient();
        this.producer = null;
        this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
        this.replicatorId = String.format("%s | %s", StringUtils.equals((CharSequence)this.localTopicName, (CharSequence)remoteTopicName) ? this.localTopicName : this.localTopicName + REPL_PRODUCER_NAME_DELIMITER + remoteTopicName, StringUtils.equals((CharSequence)localCluster, (CharSequence)remoteCluster) ? localCluster : localCluster + REPL_PRODUCER_NAME_DELIMITER + remoteCluster);
        this.producerBuilder = replicationClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(remoteTopicName).messageRoutingMode(MessageRoutingMode.SinglePartition).enableBatching(false).sendTimeout(0, TimeUnit.SECONDS).maxPendingMessages(this.producerQueueSize).producerName(this.getProducerName());
        STATE_UPDATER.set(this, State.Disconnected);
    }

    protected abstract String getProducerName();

    protected abstract void setProducerAndTriggerReadEntries(Producer<byte[]> var1);

    protected abstract Position getReplicatorReadPosition();

    @Override
    public abstract long getNumberOfEntriesInBacklog();

    protected abstract void disableReplicatorRead();

    @Override
    public String getRemoteCluster() {
        return this.remoteCluster;
    }

    @Override
    public void startProducer() {
        ImmutablePair<Boolean, State> setStartingRes = this.compareSetAndGetState(State.Disconnected, State.Starting);
        if (!((Boolean)setStartingRes.getLeft()).booleanValue()) {
            if (setStartingRes.getRight() == State.Starting) {
                log.info("[{}] Skip the producer creation since other thread is doing starting, state : {}", (Object)this.replicatorId, (Object)this.state);
            } else if (setStartingRes.getRight() == State.Started) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Replicator was already running. state: {}", (Object)this.replicatorId, (Object)this.state);
                }
            } else if (setStartingRes.getRight() == State.Disconnecting) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Rep.producer is closing, delay to retry(wait the producer close success). state: {}", (Object)this.replicatorId, (Object)this.state);
                }
                this.delayStartProducerAfterDisconnected();
            } else {
                log.info("[{}] Skip the producer creation since the replicator state is : {}", (Object)this.replicatorId, (Object)this.state);
            }
            return;
        }
        log.info("[{}] Starting replicator", (Object)this.replicatorId);
        ProducerBuilderImpl builderImpl = (ProducerBuilderImpl)this.producerBuilder;
        builderImpl.getConf().setNonPartitionedTopicExpected(true);
        ((CompletableFuture)this.producerBuilder.createAsync().thenAccept(producer -> this.setProducerAndTriggerReadEntries((Producer<byte[]>)producer))).exceptionally(ex -> {
            ImmutablePair<Boolean, State> setDisconnectedRes = this.compareSetAndGetState(State.Starting, State.Disconnected);
            if (((Boolean)setDisconnectedRes.getLeft()).booleanValue()) {
                long waitTimeMs = this.backOff.next();
                log.warn("[{}] Failed to create remote producer ({}), retrying in {} s", new Object[]{this.replicatorId, ex.getMessage(), (double)waitTimeMs / 1000.0});
                this.scheduleCheckTopicActiveAndStartProducer(waitTimeMs);
            } else if (setDisconnectedRes.getRight() == State.Terminating || setDisconnectedRes.getRight() == State.Terminated) {
                log.info("[{}] Skip to create producer, because it has been terminated, state is : {}", (Object)this.replicatorId, (Object)this.state);
            } else {
                log.warn("[{}] Other thread will try to create the producer again. so skipped current one task. State is : {}", (Object)this.replicatorId, (Object)this.state);
            }
            return null;
        });
    }

    protected void delayStartProducerAfterDisconnected() {
        long waitTimeMs = this.backOff.next();
        if (log.isDebugEnabled()) {
            log.debug("[{}] waiting for producer to close before attempting to reconnect, retrying in {} s", (Object)this.replicatorId, (Object)((double)waitTimeMs / 1000.0));
        }
        this.scheduleCheckTopicActiveAndStartProducer(waitTimeMs);
    }

    protected void scheduleCheckTopicActiveAndStartProducer(long waitTimeMs) {
        this.brokerService.executor().schedule(() -> {
            if (this.state == State.Terminating || this.state == State.Terminated) {
                log.info("[{}] Skip scheduled to start the producer since the replicator state is : {}", (Object)this.replicatorId, (Object)this.state);
                return;
            }
            CompletableFuture topicFuture = (CompletableFuture)this.brokerService.getTopics().get((Object)this.localTopicName);
            if (topicFuture == null) {
                log.info("[{}] Skip scheduled to start the producer since the topic was closed successfully. And trigger a terminate.", (Object)this.replicatorId);
                this.terminate();
                return;
            }
            ((CompletableFuture)topicFuture.thenAccept(optional -> {
                if (optional.isEmpty()) {
                    log.info("[{}] Skip scheduled to start the producer since the topic was closed. And trigger a terminate.", (Object)this.replicatorId);
                    this.terminate();
                    return;
                }
                if (optional.get() != this.localTopic) {
                    log.info("[{}] Skip scheduled to start the producer since the topic was closed. And trigger a terminate.", (Object)this.replicatorId);
                    this.terminate();
                    return;
                }
                Replicator replicator = (Replicator)this.localTopic.getReplicators().get((Object)this.remoteCluster);
                if (replicator != this) {
                    log.info("[{}] Skip scheduled to start the producer since a new replicator has instead current one. And trigger a terminate.", (Object)this.replicatorId);
                    this.terminate();
                    return;
                }
                this.startProducer();
            })).exceptionally(ex -> {
                log.error("[{}] [{}] Stop retry to create producer due to unknown error(topic create failed), and trigger a terminate. Replicator state: {}", new Object[]{this.localTopicName, this.replicatorId, STATE_UPDATER.get(this), ex});
                this.terminate();
                return null;
            });
        }, waitTimeMs, TimeUnit.MILLISECONDS);
    }

    protected CompletableFuture<Boolean> isLocalTopicActive() {
        CompletableFuture topicFuture = (CompletableFuture)this.brokerService.getTopics().get((Object)this.localTopicName);
        if (topicFuture == null) {
            return CompletableFuture.completedFuture(false);
        }
        return topicFuture.thenApplyAsync(optional -> {
            if (optional.isEmpty()) {
                return false;
            }
            return optional.get() == this.localTopic;
        }, (Executor)this.brokerService.executor());
    }

    @Override
    public CompletableFuture<Void> disconnect(boolean failIfHasBacklog, boolean closeTheStartingProducer) {
        long backlog = this.getNumberOfEntriesInBacklog();
        if (failIfHasBacklog && backlog > 0L) {
            CompletableFuture<Void> disconnectFuture = new CompletableFuture<Void>();
            disconnectFuture.completeExceptionally(new BrokerServiceException.TopicBusyException("Cannot close a replicator with backlog"));
            if (log.isDebugEnabled()) {
                log.debug("[{}] Replicator disconnect failed since topic has backlog", (Object)this.replicatorId);
            }
            return disconnectFuture;
        }
        log.info("[{}] Disconnect replicator at position {} with backlog {}", new Object[]{this.replicatorId, this.getReplicatorReadPosition(), backlog});
        return this.closeProducerAsync(closeTheStartingProducer);
    }

    protected CompletableFuture<Void> closeProducerAsync(boolean closeTheStartingProducer) {
        ImmutablePair<Boolean, State> setDisconnectingRes = this.compareSetAndGetState(State.Started, State.Disconnecting);
        if (!((Boolean)setDisconnectingRes.getLeft()).booleanValue()) {
            if (setDisconnectingRes.getRight() == State.Starting) {
                if (closeTheStartingProducer) {
                    long waitTimeMs = this.backOff.next();
                    this.brokerService.executor().schedule(() -> this.closeProducerAsync(true), waitTimeMs, TimeUnit.MILLISECONDS);
                } else {
                    log.info("[{}] Skip current producer closing since the previous producer has been closed, and trying start a new one, state : {}", (Object)this.replicatorId, setDisconnectingRes.getRight());
                }
            } else if (setDisconnectingRes.getRight() == State.Disconnected || setDisconnectingRes.getRight() == State.Disconnecting) {
                log.info("[{}] Skip current producer closing since other thread did closing, state : {}", (Object)this.replicatorId, setDisconnectingRes.getRight());
            } else if (setDisconnectingRes.getRight() == State.Terminating || setDisconnectingRes.getRight() == State.Terminated) {
                log.info("[{}] Skip current producer closing since other thread is doing termination, state : {}", (Object)this.replicatorId, (Object)this.state);
            }
            log.info("[{}] Skip current termination since other thread is doing close producer or termination, state : {}", (Object)this.replicatorId, (Object)this.state);
            return CompletableFuture.completedFuture(null);
        }
        return this.doCloseProducerAsync((Producer<byte[]>)this.producer, () -> {
            ImmutablePair<Boolean, State> setDisconnectedRes = this.compareSetAndGetState(State.Disconnecting, State.Disconnected);
            if (((Boolean)setDisconnectedRes.getLeft()).booleanValue()) {
                this.producer = null;
                this.disableReplicatorRead();
                return;
            }
            if (setDisconnectedRes.getRight() == State.Terminating || setDisconnectingRes.getRight() == State.Terminated) {
                log.info("[{}] Skip setting state to terminated because it was terminated, state : {}", (Object)this.replicatorId, (Object)this.state);
            } else {
                log.warn("[{}] Other task has change the state to terminated. so skipped current one task. State is : {}", (Object)this.replicatorId, (Object)this.state);
            }
        });
    }

    protected CompletableFuture<Void> doCloseProducerAsync(Producer<byte[]> producer, Runnable actionAfterClosed) {
        CompletableFuture future = producer == null ? CompletableFuture.completedFuture(null) : producer.closeAsync();
        return ((CompletableFuture)future.thenRun(() -> actionAfterClosed.run())).exceptionally(ex -> {
            long waitTimeMs = this.backOff.next();
            log.warn("[{}] Exception: '{}' occurred while trying to close the producer. Replicator state: {}. Retrying again in {} s.", new Object[]{this.replicatorId, ex.getMessage(), this.state, (double)waitTimeMs / 1000.0});
            this.brokerService.executor().schedule(() -> this.doCloseProducerAsync(producer, actionAfterClosed), waitTimeMs, TimeUnit.MILLISECONDS);
            return null;
        });
    }

    @Override
    public CompletableFuture<Void> terminate() {
        if (!this.tryChangeStatusToTerminating()) {
            log.info("[{}] Skip current termination since other thread is doing termination, state : {}", (Object)this.replicatorId, (Object)this.state);
            return CompletableFuture.completedFuture(null);
        }
        return this.doCloseProducerAsync((Producer<byte[]>)this.producer, () -> {
            STATE_UPDATER.set(this, State.Terminated);
            this.producer = null;
            this.disableReplicatorRead();
            this.doReleaseResources();
        });
    }

    protected void doReleaseResources() {
    }

    protected boolean tryChangeStatusToTerminating() {
        if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Terminating)) {
            return true;
        }
        if (STATE_UPDATER.compareAndSet(this, State.Started, State.Terminating)) {
            return true;
        }
        if (STATE_UPDATER.compareAndSet(this, State.Disconnecting, State.Terminating)) {
            return true;
        }
        return STATE_UPDATER.compareAndSet(this, State.Disconnected, State.Terminating);
    }

    public CompletableFuture<Void> remove() {
        return CompletableFuture.completedFuture(null);
    }

    protected boolean isWritable() {
        ProducerImpl producer = this.producer;
        return producer != null && producer.isWritable();
    }

    public static String getRemoteCluster(String remoteCursor) {
        String[] split = remoteCursor.split("\\.");
        return split[split.length - 1];
    }

    public static String getReplicatorName(String replicatorPrefix, String cluster) {
        return (replicatorPrefix + "." + cluster).intern();
    }

    public static CompletableFuture<Void> validatePartitionedTopicAsync(String topic, BrokerService brokerService) {
        TopicName topicName = TopicName.get((String)topic);
        return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources().partitionedTopicExistsAsync(topicName).thenCompose(isPartitionedTopic -> {
            if (isPartitionedTopic.booleanValue()) {
                String s = topicName + " is a partitioned-topic and replication can't be started for partitioned-producer ";
                log.error(s);
                return FutureUtil.failedFuture((Throwable)new BrokerServiceException.NamingException(s));
            }
            return CompletableFuture.completedFuture(null);
        });
    }

    public State getState() {
        return this.state;
    }

    protected ImmutablePair<Boolean, State> compareSetAndGetState(State expect, State update) {
        State original1 = this.state;
        if (STATE_UPDATER.compareAndSet(this, expect, update)) {
            return ImmutablePair.of((Object)true, (Object)((Object)expect));
        }
        State original2 = this.state;
        if (original1 == original2) {
            return ImmutablePair.of((Object)false, (Object)((Object)original1));
        }
        return this.compareSetAndGetState(expect, update);
    }

    @Override
    public boolean isTerminated() {
        return this.state == State.Terminating || this.state == State.Terminated;
    }

    public static enum State {
        Disconnected,
        Starting,
        Started,
        Disconnecting,
        Terminating,
        Terminated;

    }
}

