package org.apache.pulsar.broker.service;

import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.StringInterner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/AbstractReplicator.class */
public abstract class AbstractReplicator implements Replicator {
    protected final BrokerService brokerService;
    protected final String localTopicName;
    protected final String localCluster;
    protected final String remoteTopicName;
    protected final String remoteCluster;
    protected final PulsarClientImpl replicationClient;
    protected final PulsarClientImpl client;
    protected String replicatorId;
    protected final Topic localTopic;
    public static final String REPL_PRODUCER_NAME_DELIMITER = "-->";
    protected final int producerQueueSize;
    protected final ProducerBuilder<byte[]> producerBuilder;
    protected final String replicatorPrefix;
    protected static final AtomicReferenceFieldUpdater<AbstractReplicator, State> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractReplicator.class, State.class, "state");
    private static final Logger log = LoggerFactory.getLogger(AbstractReplicator.class);
    protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);

    @VisibleForTesting
    protected volatile State state = State.Disconnected;
    protected volatile ProducerImpl producer = null;

    /* loaded from: input_file:org/apache/pulsar/broker/service/AbstractReplicator$State.class */
    public enum State {
        Disconnected,
        Starting,
        Started,
        Disconnecting,
        Terminating,
        Terminated
    }

    public AbstractReplicator(String str, Topic topic, String str2, String str3, String str4, BrokerService brokerService, PulsarClientImpl pulsarClientImpl) throws PulsarServerException {
        this.brokerService = brokerService;
        this.localTopic = topic;
        this.localTopicName = topic.getName();
        this.replicatorPrefix = str4;
        this.localCluster = StringInterner.intern(str);
        this.remoteTopicName = str3;
        this.remoteCluster = StringInterner.intern(str2);
        this.replicationClient = pulsarClientImpl;
        this.client = brokerService.pulsar().getClient();
        this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
        Object[] objArr = new Object[2];
        objArr[0] = StringUtils.equals(this.localTopicName, str3) ? this.localTopicName : this.localTopicName + "-->" + str3;
        objArr[1] = StringUtils.equals(str, str2) ? str : str + "-->" + str2;
        this.replicatorId = String.format("%s | %s", objArr);
        this.producerBuilder = pulsarClientImpl.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(str3).messageRoutingMode(MessageRoutingMode.SinglePartition).enableBatching(false).sendTimeout(0, TimeUnit.SECONDS).maxPendingMessages(this.producerQueueSize).producerName(getProducerName());
        STATE_UPDATER.set(this, State.Disconnected);
    }

    protected abstract String getProducerName();

    protected abstract void setProducerAndTriggerReadEntries(org.apache.pulsar.client.api.Producer<byte[]> producer);

    protected abstract Position getReplicatorReadPosition();

    @Override // org.apache.pulsar.broker.service.Replicator
    public abstract long getNumberOfEntriesInBacklog();

    protected abstract void disableReplicatorRead();

    @Override // org.apache.pulsar.broker.service.Replicator
    public String getRemoteCluster() {
        return this.remoteCluster;
    }

    @Override // org.apache.pulsar.broker.service.Replicator
    public void startProducer() {
        ImmutablePair<Boolean, State> compareSetAndGetState = compareSetAndGetState(State.Disconnected, State.Starting);
        if (((Boolean) compareSetAndGetState.getLeft()).booleanValue()) {
            log.info("[{}] Starting replicator", this.replicatorId);
            this.producerBuilder.createAsync().thenAccept(producer -> {
                setProducerAndTriggerReadEntries(producer);
            }).exceptionally(th -> {
                ImmutablePair<Boolean, State> compareSetAndGetState2 = compareSetAndGetState(State.Starting, State.Disconnected);
                if (((Boolean) compareSetAndGetState2.getLeft()).booleanValue()) {
                    long next = this.backOff.next();
                    log.warn("[{}] Failed to create remote producer ({}), retrying in {} s", new Object[]{this.replicatorId, th.getMessage(), Double.valueOf(next / 1000.0d)});
                    scheduleCheckTopicActiveAndStartProducer(next);
                    return null;
                }
                if (compareSetAndGetState2.getRight() == State.Terminating || compareSetAndGetState2.getRight() == State.Terminated) {
                    log.info("[{}] Skip to create producer, because it has been terminated, state is : {}", this.replicatorId, this.state);
                    return null;
                }
                log.warn("[{}] Other thread will try to create the producer again. so skipped current one task. State is : {}", this.replicatorId, this.state);
                return null;
            });
            return;
        }
        if (compareSetAndGetState.getRight() == State.Starting) {
            log.info("[{}] Skip the producer creation since other thread is doing starting, state : {}", this.replicatorId, this.state);
            return;
        }
        if (compareSetAndGetState.getRight() == State.Started) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Replicator was already running. state: {}", this.replicatorId, this.state);
            }
        } else {
            if (compareSetAndGetState.getRight() != State.Disconnecting) {
                log.info("[{}] Skip the producer creation since the replicator state is : {}", this.replicatorId, this.state);
                return;
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] Rep.producer is closing, delay to retry(wait the producer close success). state: {}", this.replicatorId, this.state);
            }
            delayStartProducerAfterDisconnected();
        }
    }

    protected void delayStartProducerAfterDisconnected() {
        long next = this.backOff.next();
        if (log.isDebugEnabled()) {
            log.debug("[{}] waiting for producer to close before attempting to reconnect, retrying in {} s", this.replicatorId, Double.valueOf(next / 1000.0d));
        }
        scheduleCheckTopicActiveAndStartProducer(next);
    }

    protected void scheduleCheckTopicActiveAndStartProducer(long j) {
        this.brokerService.executor().schedule(() -> {
            if (this.state == State.Terminating || this.state == State.Terminated) {
                log.info("[{}] Skip scheduled to start the producer since the replicator state is : {}", this.replicatorId, this.state);
                return;
            }
            CompletableFuture completableFuture = (CompletableFuture) this.brokerService.getTopics().get(this.localTopicName);
            if (completableFuture != null) {
                completableFuture.thenAccept(optional -> {
                    if (optional.isEmpty()) {
                        log.info("[{}] Skip scheduled to start the producer since the topic was closed. And trigger a terminate.", this.replicatorId);
                        terminate();
                    } else if (optional.get() != this.localTopic) {
                        log.info("[{}] Skip scheduled to start the producer since the topic was closed. And trigger a terminate.", this.replicatorId);
                        terminate();
                    } else if (((Replicator) this.localTopic.getReplicators().get(this.remoteCluster)) == this) {
                        startProducer();
                    } else {
                        log.info("[{}] Skip scheduled to start the producer since a new replicator has instead current one. And trigger a terminate.", this.replicatorId);
                        terminate();
                    }
                }).exceptionally(th -> {
                    log.error("[{}] [{}] Stop retry to create producer due to unknown error(topic create failed), and trigger a terminate. Replicator state: {}", new Object[]{this.localTopicName, this.replicatorId, STATE_UPDATER.get(this), th});
                    terminate();
                    return null;
                });
            } else {
                log.info("[{}] Skip scheduled to start the producer since the topic was closed successfully. And trigger a terminate.", this.replicatorId);
                terminate();
            }
        }, j, TimeUnit.MILLISECONDS);
    }

    protected CompletableFuture<Boolean> isLocalTopicActive() {
        CompletableFuture completableFuture = (CompletableFuture) this.brokerService.getTopics().get(this.localTopicName);
        return completableFuture == null ? CompletableFuture.completedFuture(false) : completableFuture.thenApplyAsync(optional -> {
            if (optional.isEmpty()) {
                return false;
            }
            return Boolean.valueOf(optional.get() == this.localTopic);
        }, (Executor) this.brokerService.executor());
    }

    @Override // org.apache.pulsar.broker.service.Replicator
    public CompletableFuture<Void> disconnect(boolean z, boolean z2) {
        long numberOfEntriesInBacklog = getNumberOfEntriesInBacklog();
        if (!z || numberOfEntriesInBacklog <= 0) {
            log.info("[{}] Disconnect replicator at position {} with backlog {}", new Object[]{this.replicatorId, getReplicatorReadPosition(), Long.valueOf(numberOfEntriesInBacklog)});
            return closeProducerAsync(z2);
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        completableFuture.completeExceptionally(new BrokerServiceException.TopicBusyException("Cannot close a replicator with backlog"));
        if (log.isDebugEnabled()) {
            log.debug("[{}] Replicator disconnect failed since topic has backlog", this.replicatorId);
        }
        return completableFuture;
    }

    protected CompletableFuture<Void> closeProducerAsync(boolean z) {
        ImmutablePair<Boolean, State> compareSetAndGetState = compareSetAndGetState(State.Started, State.Disconnecting);
        if (((Boolean) compareSetAndGetState.getLeft()).booleanValue()) {
            return doCloseProducerAsync(this.producer, () -> {
                ImmutablePair<Boolean, State> compareSetAndGetState2 = compareSetAndGetState(State.Disconnecting, State.Disconnected);
                if (((Boolean) compareSetAndGetState2.getLeft()).booleanValue()) {
                    this.producer = null;
                    disableReplicatorRead();
                } else if (compareSetAndGetState2.getRight() == State.Terminating || compareSetAndGetState.getRight() == State.Terminated) {
                    log.info("[{}] Skip setting state to terminated because it was terminated, state : {}", this.replicatorId, this.state);
                } else {
                    log.warn("[{}] Other task has change the state to terminated. so skipped current one task. State is : {}", this.replicatorId, this.state);
                }
            });
        }
        if (compareSetAndGetState.getRight() == State.Starting) {
            if (z) {
                this.brokerService.executor().schedule(() -> {
                    return closeProducerAsync(true);
                }, this.backOff.next(), TimeUnit.MILLISECONDS);
            } else {
                log.info("[{}] Skip current producer closing since the previous producer has been closed, and trying start a new one, state : {}", this.replicatorId, compareSetAndGetState.getRight());
            }
        } else if (compareSetAndGetState.getRight() == State.Disconnected || compareSetAndGetState.getRight() == State.Disconnecting) {
            log.info("[{}] Skip current producer closing since other thread did closing, state : {}", this.replicatorId, compareSetAndGetState.getRight());
        } else if (compareSetAndGetState.getRight() == State.Terminating || compareSetAndGetState.getRight() == State.Terminated) {
            log.info("[{}] Skip current producer closing since other thread is doing termination, state : {}", this.replicatorId, this.state);
        }
        log.info("[{}] Skip current termination since other thread is doing close producer or termination, state : {}", this.replicatorId, this.state);
        return CompletableFuture.completedFuture(null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<Void> doCloseProducerAsync(org.apache.pulsar.client.api.Producer<byte[]> producer, Runnable runnable) {
        return (producer == null ? CompletableFuture.completedFuture(null) : producer.closeAsync()).thenRun(() -> {
            runnable.run();
        }).exceptionally(th -> {
            long next = this.backOff.next();
            log.warn("[{}] Exception: '{}' occurred while trying to close the producer. Replicator state: {}. Retrying again in {} s.", new Object[]{this.replicatorId, th.getMessage(), this.state, Double.valueOf(next / 1000.0d)});
            this.brokerService.executor().schedule(() -> {
                return doCloseProducerAsync(producer, runnable);
            }, next, TimeUnit.MILLISECONDS);
            return null;
        });
    }

    @Override // org.apache.pulsar.broker.service.Replicator
    public CompletableFuture<Void> terminate() {
        if (tryChangeStatusToTerminating()) {
            return doCloseProducerAsync(this.producer, () -> {
                STATE_UPDATER.set(this, State.Terminated);
                this.producer = null;
                disableReplicatorRead();
                doReleaseResources();
            });
        }
        log.info("[{}] Skip current termination since other thread is doing termination, state : {}", this.replicatorId, this.state);
        return CompletableFuture.completedFuture(null);
    }

    protected void doReleaseResources() {
    }

    protected boolean tryChangeStatusToTerminating() {
        return STATE_UPDATER.compareAndSet(this, State.Starting, State.Terminating) || STATE_UPDATER.compareAndSet(this, State.Started, State.Terminating) || STATE_UPDATER.compareAndSet(this, State.Disconnecting, State.Terminating) || STATE_UPDATER.compareAndSet(this, State.Disconnected, State.Terminating);
    }

    public CompletableFuture<Void> remove() {
        return CompletableFuture.completedFuture(null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isWritable() {
        ProducerImpl producerImpl = this.producer;
        return producerImpl != null && producerImpl.isWritable();
    }

    public static String getRemoteCluster(String str) {
        String[] split = str.split("\\.");
        return split[split.length - 1];
    }

    public static String getReplicatorName(String str, String str2) {
        return StringInterner.intern(str + "." + str2);
    }

    public static CompletableFuture<Void> validatePartitionedTopicAsync(String str, BrokerService brokerService) {
        TopicName topicName = TopicName.get(str);
        return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources().partitionedTopicExistsAsync(topicName).thenCompose(bool -> {
            if (!bool.booleanValue()) {
                return CompletableFuture.completedFuture(null);
            }
            String str2 = topicName + " is a partitioned-topic and replication can't be started for partitioned-producer ";
            log.error(str2);
            return FutureUtil.failedFuture(new BrokerServiceException.NamingException(str2));
        });
    }

    public State getState() {
        return this.state;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ImmutablePair<Boolean, State> compareSetAndGetState(State state, State state2) {
        State state3 = this.state;
        return STATE_UPDATER.compareAndSet(this, state, state2) ? ImmutablePair.of(true, state) : state3 == this.state ? ImmutablePair.of(false, state3) : compareSetAndGetState(state, state2);
    }

    @Override // org.apache.pulsar.broker.service.Replicator
    public boolean isTerminated() {
        return this.state == State.Terminating || this.state == State.Terminated;
    }
}
