/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractReplicator {
    protected final BrokerService brokerService;
    protected final String topicName;
    protected final String localCluster;
    protected final String remoteCluster;
    protected final PulsarClientImpl client;
    protected volatile ProducerImpl producer;
    protected final int producerQueueSize;
    protected final ProducerConfiguration producerConfiguration;
    protected final Backoff backOff = new Backoff(100L, TimeUnit.MILLISECONDS, 1L, TimeUnit.MINUTES, 0L, TimeUnit.MILLISECONDS);
    protected final String replicatorPrefix;
    protected static final AtomicReferenceFieldUpdater<AbstractReplicator, State> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractReplicator.class, State.class, "state");
    private volatile State state = State.Stopped;
    private static final Logger log = LoggerFactory.getLogger(AbstractReplicator.class);

    public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster, String remoteCluster, BrokerService brokerService) {
        this.brokerService = brokerService;
        this.topicName = topicName;
        this.replicatorPrefix = replicatorPrefix;
        this.localCluster = localCluster.intern();
        this.remoteCluster = remoteCluster.intern();
        this.client = (PulsarClientImpl)brokerService.getReplicationClient(remoteCluster);
        this.producer = null;
        this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
        this.producerConfiguration = new ProducerConfiguration();
        this.producerConfiguration.setSendTimeout(0, TimeUnit.SECONDS);
        this.producerConfiguration.setMaxPendingMessages(this.producerQueueSize);
        this.producerConfiguration.setProducerName(AbstractReplicator.getReplicatorName(replicatorPrefix, localCluster));
        STATE_UPDATER.set(this, State.Stopped);
    }

    protected abstract void readEntries(Producer var1);

    protected abstract Position getReplicatorReadPosition();

    protected abstract long getNumberOfEntriesInBacklog();

    protected abstract void disableReplicatorRead();

    public ProducerConfiguration getProducerConfiguration() {
        return this.producerConfiguration;
    }

    public String getRemoteCluster() {
        return this.remoteCluster;
    }

    public synchronized void startProducer() {
        if (STATE_UPDATER.get(this) == State.Stopping) {
            long waitTimeMs = this.backOff.next();
            if (log.isDebugEnabled()) {
                log.debug("[{}][{} -> {}] waiting for producer to close before attempting to reconnect, retrying in {} s", new Object[]{this.topicName, this.localCluster, this.remoteCluster, (double)waitTimeMs / 1000.0});
            }
            this.brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS);
            return;
        }
        State state = STATE_UPDATER.get(this);
        if (!STATE_UPDATER.compareAndSet(this, State.Stopped, State.Starting)) {
            if (state == State.Started) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{} -> {}] Replicator was already running", new Object[]{this.topicName, this.localCluster, this.remoteCluster});
                }
            } else {
                log.info("[{}][{} -> {}] Replicator already being started. Replicator state: {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, state});
            }
            return;
        }
        log.info("[{}][{} -> {}] Starting replicator", new Object[]{this.topicName, this.localCluster, this.remoteCluster});
        ((CompletableFuture)this.client.createProducerAsync(this.topicName, this.producerConfiguration).thenAccept(producer -> this.readEntries((Producer)producer))).exceptionally(ex -> {
            if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) {
                long waitTimeMs = this.backOff.next();
                log.warn("[{}][{} -> {}] Failed to create remote producer ({}), retrying in {} s", new Object[]{this.topicName, this.localCluster, this.remoteCluster, ex.getMessage(), (double)waitTimeMs / 1000.0});
                this.brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS);
            } else {
                log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, STATE_UPDATER.get(this), ex});
            }
            return null;
        });
    }

    protected synchronized CompletableFuture<Void> closeProducerAsync() {
        if (this.producer == null) {
            STATE_UPDATER.set(this, State.Stopped);
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture future = this.producer.closeAsync();
        ((CompletableFuture)future.thenRun(() -> {
            STATE_UPDATER.set(this, State.Stopped);
            this.producer = null;
            this.disableReplicatorRead();
        })).exceptionally(ex -> {
            long waitTimeMs = this.backOff.next();
            log.warn("[{}][{} -> {}] Exception: '{}' occured while trying to close the producer. retrying again in {} s", new Object[]{this.topicName, this.localCluster, this.remoteCluster, ex.getMessage(), (double)waitTimeMs / 1000.0});
            this.brokerService.executor().schedule(this::closeProducerAsync, waitTimeMs, TimeUnit.MILLISECONDS);
            return null;
        });
        return future;
    }

    public CompletableFuture<Void> disconnect() {
        return this.disconnect(false);
    }

    public synchronized CompletableFuture<Void> disconnect(boolean failIfHasBacklog) {
        if (failIfHasBacklog && this.getNumberOfEntriesInBacklog() > 0L) {
            CompletableFuture<Void> disconnectFuture = new CompletableFuture<Void>();
            disconnectFuture.completeExceptionally(new BrokerServiceException.TopicBusyException("Cannot close a replicator with backlog"));
            log.debug("[{}][{} -> {}] Replicator disconnect failed since topic has backlog", new Object[]{this.topicName, this.localCluster, this.remoteCluster});
            return disconnectFuture;
        }
        if (STATE_UPDATER.get(this) == State.Stopping) {
            return CompletableFuture.completedFuture(null);
        }
        if (this.producer != null && (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopping) || STATE_UPDATER.compareAndSet(this, State.Started, State.Stopping))) {
            log.info("[{}][{} -> {}] Disconnect replicator at position {} with backlog {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, this.getReplicatorReadPosition(), this.getNumberOfEntriesInBacklog()});
            return this.closeProducerAsync();
        }
        STATE_UPDATER.set(this, State.Stopped);
        return CompletableFuture.completedFuture(null);
    }

    public CompletableFuture<Void> remove() {
        return null;
    }

    protected boolean isWritable() {
        ProducerImpl producer = this.producer;
        return producer != null && producer.isWritable();
    }

    public static String getRemoteCluster(String remoteCursor) {
        String[] split = remoteCursor.split("\\.");
        return split[split.length - 1];
    }

    public static String getReplicatorName(String replicatorPrefix, String cluster) {
        return (String.valueOf(replicatorPrefix) + "." + cluster).intern();
    }

    protected static enum State {
        Stopped,
        Starting,
        Started,
        Stopping;

    }
}

