/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataEvent;
import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarMetadataEventSynchronizer
implements MetadataEventSynchronizer {
    private static final Logger log = LoggerFactory.getLogger(PulsarMetadataEventSynchronizer.class);
    protected PulsarService pulsar;
    protected BrokerService brokerService;
    protected String topicName;
    protected PulsarClientImpl client;
    protected volatile Producer<MetadataEvent> producer;
    protected volatile Consumer<MetadataEvent> consumer;
    private final CopyOnWriteArrayList<Function<MetadataEvent, CompletableFuture<Void>>> listeners = new CopyOnWriteArrayList();
    static final AtomicReferenceFieldUpdater<PulsarMetadataEventSynchronizer, State> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(PulsarMetadataEventSynchronizer.class, State.class, "state");
    private volatile State state;
    public static final String SUBSCRIPTION_NAME = "metadata-syncer";
    private static final int MAX_PRODUCER_PENDING_SIZE = 1000;
    protected final Backoff backOff = new Backoff(100L, TimeUnit.MILLISECONDS, 1L, TimeUnit.MINUTES, 0L, TimeUnit.MILLISECONDS);
    private volatile CompletableFuture<Void> closeFuture;

    public PulsarMetadataEventSynchronizer(PulsarService pulsar, String topicName) {
        this.pulsar = pulsar;
        this.brokerService = pulsar.getBrokerService();
        this.topicName = topicName;
        this.state = State.Init;
        if (!StringUtils.isNotBlank((CharSequence)topicName)) {
            log.info("Metadata synchronizer is disabled");
        }
    }

    public void start() throws PulsarServerException {
        if (StringUtils.isBlank((CharSequence)this.topicName)) {
            log.info("metadata topic doesn't exist.. skipping metadata synchronizer init..");
            return;
        }
        log.info("Metadata event synchronizer is starting on topic {}", (Object)this.topicName);
        this.client = (PulsarClientImpl)this.pulsar.getClient();
        if (STATE_UPDATER.compareAndSet(this, State.Init, State.Starting_Producer)) {
            this.startProducer();
        }
    }

    public CompletableFuture<Void> notify(MetadataEvent event) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.publishAsync(event, future);
        return future;
    }

    public void registerSyncListener(Function<MetadataEvent, CompletableFuture<Void>> listener) {
        this.listeners.add(listener);
    }

    public String getClusterName() {
        return this.pulsar.getConfig().getClusterName();
    }

    private void publishAsync(MetadataEvent event, CompletableFuture<Void> future) {
        if (!this.isProducerStarted()) {
            log.info("Producer is not started on {}, failed to publish {}", (Object)this.topicName, (Object)event);
            future.completeExceptionally(new IllegalStateException("producer is not started yet"));
        }
        ((CompletableFuture)this.producer.newMessage().value((Object)event).sendAsync().thenAccept(__ -> {
            log.info("successfully published metadata change event {}", (Object)event);
            future.complete(null);
        })).exceptionally(ex -> {
            log.warn("failed to publish metadata update {}, will retry in {}", new Object[]{this.topicName, 1000, ex});
            this.pulsar.getBrokerService().executor().schedule(() -> this.publishAsync(event, future), 1000L, TimeUnit.MILLISECONDS);
            return null;
        });
    }

    private void startProducer() {
        if (this.isClosingOrClosed()) {
            log.info("[{}] Skip to start new producer because the synchronizer is closed", (Object)this.topicName);
        }
        if (this.producer != null) {
            log.error("[{}] Failed to start the producer because the producer has been set, state: {}", (Object)this.topicName, (Object)this.state);
            return;
        }
        log.info("[{}] Starting producer", (Object)this.topicName);
        ((CompletableFuture)this.client.newProducer(Schema.AVRO(MetadataEvent.class)).topic(this.topicName).messageRoutingMode(MessageRoutingMode.SinglePartition).enableBatching(false).enableBatching(false).sendTimeout(0, TimeUnit.SECONDS).maxPendingMessages(1000).createAsync().thenAccept(prod -> {
            this.backOff.reset();
            if (STATE_UPDATER.compareAndSet(this, State.Starting_Producer, State.Starting_Consumer)) {
                this.producer = prod;
                log.info("producer is created successfully {}", (Object)this.topicName);
                this.startConsumer();
            } else {
                State stateTransient = this.state;
                log.info("[{}] Closing the new producer because the synchronizer state is {}", prod, (Object)stateTransient);
                CompletableFuture<Void> closeProducer = new CompletableFuture<Void>();
                this.closeResource(() -> prod.closeAsync(), closeProducer);
                closeProducer.thenRun(() -> log.info("[{}] Closed the new producer because the synchronizer state is {}", prod, (Object)stateTransient));
            }
        })).exceptionally(ex -> {
            long waitTimeMs = this.backOff.next();
            log.warn("[{}] Failed to create producer ({}), retrying in {} s", new Object[]{this.topicName, ex.getMessage(), (double)waitTimeMs / 1000.0});
            this.brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS);
            return null;
        });
    }

    private void startConsumer() {
        if (this.isClosingOrClosed()) {
            log.info("[{}] Skip to start new consumer because the synchronizer is closed", (Object)this.topicName);
        }
        if (this.consumer != null) {
            log.error("[{}] Failed to start the consumer because the consumer has been set, state: {}", (Object)this.topicName, (Object)this.state);
            return;
        }
        log.info("[{}] Starting consumer", (Object)this.topicName);
        ConsumerBuilder consumerBuilder = this.client.newConsumer(Schema.AVRO(MetadataEvent.class)).topic(new String[]{this.topicName}).subscriptionName(SUBSCRIPTION_NAME).ackTimeout(60L, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Failover).messageListener((MessageListener & Serializable)(c, msg) -> {
            log.info("Processing metadata event for {} with listeners {}", (Object)((MetadataEvent)msg.getValue()).getPath(), (Object)this.listeners.size());
            try {
                if (this.listeners.size() == 0) {
                    c.acknowledgeAsync(msg);
                    return;
                }
                if (this.listeners.size() == 1) {
                    ((CompletableFuture)this.listeners.get(0).apply((MetadataEvent)msg.getValue()).thenApply(__ -> c.acknowledgeAsync(msg))).exceptionally(ex -> {
                        log.warn("Failed to synchronize {} for {}", new Object[]{msg.getMessageId(), this.topicName, ex.getCause()});
                        return null;
                    });
                } else {
                    ((CompletableFuture)FutureUtil.waitForAll((Collection)this.listeners.stream().map(listener -> (CompletableFuture)listener.apply((MetadataEvent)msg.getValue())).collect(Collectors.toList())).thenApply(__ -> c.acknowledgeAsync(msg))).exceptionally(ex -> {
                        log.warn("Failed to synchronize {} for {}", (Object)msg.getMessageId(), (Object)this.topicName);
                        return null;
                    });
                }
            }
            catch (Exception e) {
                log.warn("Failed to synchronize {} for {}", (Object)msg.getMessageId(), (Object)this.topicName);
            }
        });
        ((CompletableFuture)consumerBuilder.subscribeAsync().thenAccept(consumer -> {
            this.backOff.reset();
            if (STATE_UPDATER.compareAndSet(this, State.Starting_Consumer, State.Started)) {
                this.consumer = consumer;
                log.info("successfully created consumer {}", (Object)this.topicName);
            } else {
                State stateTransient = this.state;
                log.info("[{}] Closing the new consumer because the synchronizer state is {}", (Object)stateTransient);
                CompletableFuture<Void> closeConsumer = new CompletableFuture<Void>();
                this.closeResource(() -> consumer.closeAsync(), closeConsumer);
                closeConsumer.thenRun(() -> log.info("[{}] Closed the new consumer because the synchronizer state is {}", (Object)stateTransient));
            }
        })).exceptionally(ex -> {
            long waitTimeMs = this.backOff.next();
            log.warn("[{}] Failed to create consumer ({}), retrying in {} s", new Object[]{this.topicName, ex.getMessage(), (double)waitTimeMs / 1000.0});
            this.brokerService.executor().schedule(this::startConsumer, waitTimeMs, TimeUnit.MILLISECONDS);
            return null;
        });
    }

    public boolean isStarted() {
        return this.state == State.Started;
    }

    public boolean isProducerStarted() {
        return this.state.ordinal() > State.Starting_Producer.ordinal() && this.state.ordinal() < State.Closing.ordinal();
    }

    public boolean isClosingOrClosed() {
        return this.state == State.Closing || this.state == State.Closed;
    }

    public synchronized CompletableFuture<Void> closeAsync() {
        block6: {
            int tryChangeStateCounter = 0;
            do {
                if (this.isClosingOrClosed()) {
                    return this.closeFuture;
                }
                if (STATE_UPDATER.compareAndSet(this, State.Init, State.Closing) || STATE_UPDATER.compareAndSet(this, State.Starting_Producer, State.Closing) || STATE_UPDATER.compareAndSet(this, State.Starting_Consumer, State.Closing) || STATE_UPDATER.compareAndSet(this, State.Started, State.Closing)) break block6;
            } while (++tryChangeStateCounter <= 100);
            log.error("Unexpected error: the state can not be changed to closing {}, state: {}", (Object)this.topicName, (Object)this.state);
            return CompletableFuture.failedFuture(new RuntimeException("Unexpected error, the state can not be changed to closing"));
        }
        CompletableFuture<Void> closeProducer = new CompletableFuture<Void>();
        CompletableFuture<Void> closeConsumer = new CompletableFuture<Void>();
        if (this.producer == null) {
            closeProducer.complete(null);
        } else {
            this.closeResource(() -> this.producer.closeAsync(), closeProducer);
        }
        if (this.consumer == null) {
            closeConsumer.complete(null);
        } else {
            this.closeResource(() -> this.consumer.closeAsync(), closeConsumer);
        }
        closeProducer.thenRun(() -> log.info("Successfully close producer {}", (Object)this.topicName));
        closeConsumer.thenRun(() -> log.info("Successfully close consumer {}", (Object)this.topicName));
        this.closeFuture = FutureUtil.waitForAll(Arrays.asList(closeProducer, closeConsumer));
        this.closeFuture.thenRun(() -> {
            this.state = State.Closed;
            log.info("Successfully close metadata store synchronizer {}", (Object)this.topicName);
        });
        return this.closeFuture;
    }

    private void closeResource(Supplier<CompletableFuture<Void>> asyncCloseable, CompletableFuture<Void> future) {
        if (asyncCloseable == null) {
            future.complete(null);
            return;
        }
        asyncCloseable.get().whenComplete((ignore, ex) -> {
            if (ex == null) {
                this.backOff.reset();
                future.complete(null);
                return;
            }
            long waitTimeMs = this.backOff.next();
            log.warn("[{}] Exception: '{}' occurred while trying to close the %s. Retrying again in {} s.", new Object[]{this.topicName, ex.getMessage(), asyncCloseable.getClass().getSimpleName(), (double)waitTimeMs / 1000.0, ex});
            this.brokerService.executor().schedule(() -> this.lambda$closeResource$21((Supplier)asyncCloseable, future), waitTimeMs, TimeUnit.MILLISECONDS);
        });
    }

    @Generated
    public String getTopicName() {
        return this.topicName;
    }

    @Generated
    public State getState() {
        return this.state;
    }

    private /* synthetic */ void lambda$closeResource$21(Supplier asyncCloseable, CompletableFuture future) {
        this.closeResource(asyncCloseable, future);
    }

    public static enum State {
        Init,
        Starting_Producer,
        Starting_Consumer,
        Started,
        Closing,
        Closed;

    }
}

