/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.loadbalance.extensions.channel;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectWriter;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateMetadataStoreTableViewImpl;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableView;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableViewImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServiceUnitStateTableViewSyncer
implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(ServiceUnitStateTableViewSyncer.class);
    private static final int MAX_CONCURRENT_SYNC_COUNT = 100;
    private static final int SYNC_WAIT_TIME_IN_SECS = 300;
    private PulsarService pulsar;
    private volatile ServiceUnitStateTableView systemTopicTableView;
    private volatile ServiceUnitStateTableView metadataStoreTableView;
    private volatile boolean isActive = false;

    public void start(PulsarService pulsar) throws IOException, TimeoutException, InterruptedException, ExecutionException {
        if (!pulsar.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()) {
            return;
        }
        if (this.isActive) {
            return;
        }
        this.pulsar = pulsar;
        try {
            this.syncExistingItems();
            if (!ExtensibleLoadManagerImpl.configureSystemTopics(pulsar, 0L)) {
                throw new IllegalStateException("Failed to disable compaction");
            }
            this.syncTailItems();
            this.isActive = true;
        }
        catch (Throwable e) {
            log.error("Failed to start ServiceUnitStateTableViewSyncer", e);
            throw e;
        }
    }

    private CompletableFuture<Void> syncToSystemTopic(String key, ServiceUnitStateData data) {
        return this.systemTopicTableView.put(key, data);
    }

    private CompletableFuture<Void> syncToMetadataStore(String key, ServiceUnitStateData data) {
        return this.metadataStoreTableView.put(key, data);
    }

    private void dummy(String key, ServiceUnitStateData data) {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void syncExistingItems() throws IOException, ExecutionException, InterruptedException, TimeoutException {
        long started = System.currentTimeMillis();
        ServiceUnitStateMetadataStoreTableViewImpl metadataStoreTableView = new ServiceUnitStateMetadataStoreTableViewImpl();
        try {
            metadataStoreTableView.start(this.pulsar, this::dummy, this::dummy);
            ServiceUnitStateTableViewImpl systemTopicTableView = new ServiceUnitStateTableViewImpl();
            try {
                systemTopicTableView.start(this.pulsar, this::dummy, this::dummy);
                ServiceConfiguration.ServiceUnitTableViewSyncerType syncer = this.pulsar.getConfiguration().getLoadBalancerServiceUnitTableViewSyncer();
                if (syncer == ServiceConfiguration.ServiceUnitTableViewSyncerType.SystemTopicToMetadataStoreSyncer) {
                    this.clean(metadataStoreTableView);
                    this.syncExistingItemsToMetadataStore(systemTopicTableView);
                } else {
                    this.clean(systemTopicTableView);
                    this.syncExistingItemsToSystemTopic(metadataStoreTableView, systemTopicTableView);
                }
                if (!this.waitUntilSynced(metadataStoreTableView, systemTopicTableView, started)) {
                    throw new TimeoutException(syncer + " failed to sync existing items in tableviews. MetadataStoreTableView.size: " + metadataStoreTableView.entrySet().size() + ", SystemTopicTableView.size: " + systemTopicTableView.entrySet().size() + " in 300 secs");
                }
                log.info("Synced existing items MetadataStoreTableView.size:{} , SystemTopicTableView.size: {} in {} secs", new Object[]{metadataStoreTableView.entrySet().size(), systemTopicTableView.entrySet().size(), TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - started)});
            }
            finally {
                if (Collections.singletonList(systemTopicTableView).get(0) != null) {
                    systemTopicTableView.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(metadataStoreTableView).get(0) != null) {
                metadataStoreTableView.close();
            }
        }
    }

    private void syncTailItems() throws InterruptedException, IOException, TimeoutException {
        long started = System.currentTimeMillis();
        if (this.metadataStoreTableView != null) {
            this.metadataStoreTableView.close();
            this.metadataStoreTableView = null;
        }
        if (this.systemTopicTableView != null) {
            this.systemTopicTableView.close();
            this.systemTopicTableView = null;
        }
        this.metadataStoreTableView = new ServiceUnitStateMetadataStoreTableViewImpl();
        this.metadataStoreTableView.start(this.pulsar, this::syncToSystemTopic, this::dummy);
        log.info("Started MetadataStoreTableView");
        this.systemTopicTableView = new ServiceUnitStateTableViewImpl();
        this.systemTopicTableView.start(this.pulsar, this::syncToMetadataStore, this::dummy);
        log.info("Started SystemTopicTableView");
        ServiceConfiguration.ServiceUnitTableViewSyncerType syncer = this.pulsar.getConfiguration().getLoadBalancerServiceUnitTableViewSyncer();
        if (!this.waitUntilSynced(this.metadataStoreTableView, this.systemTopicTableView, started)) {
            throw new TimeoutException(syncer + " failed to sync tableviews. MetadataStoreTableView.size: " + this.metadataStoreTableView.entrySet().size() + ", SystemTopicTableView.size: " + this.systemTopicTableView.entrySet().size() + " in 300 secs");
        }
        log.info("Successfully started ServiceUnitStateTableViewSyncer MetadataStoreTableView.size:{} , SystemTopicTableView.size: {} in {} secs", new Object[]{this.metadataStoreTableView.entrySet().size(), this.systemTopicTableView.entrySet().size(), TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - started)});
    }

    private void syncExistingItemsToMetadataStore(ServiceUnitStateTableView src) throws JsonProcessingException, ExecutionException, InterruptedException, TimeoutException {
        MetadataStoreExtended store = this.pulsar.getLocalMetadataStore();
        ObjectWriter writer = ObjectMapperFactory.getMapper().writer();
        int opTimeout = this.pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds();
        ArrayList<CompletionStage> futures = new ArrayList<CompletionStage>();
        Iterator<Map.Entry<String, ServiceUnitStateData>> srcIter = src.entrySet().iterator();
        while (srcIter.hasNext()) {
            Map.Entry<String, ServiceUnitStateData> e = srcIter.next();
            futures.add(store.put("/service_unit_state/" + e.getKey(), writer.writeValueAsBytes((Object)e.getValue()), Optional.empty()).thenApply(__ -> null));
            if (futures.size() != 100 && srcIter.hasNext()) continue;
            FutureUtil.waitForAll(futures).get(opTimeout, TimeUnit.SECONDS);
        }
    }

    private void syncExistingItemsToSystemTopic(ServiceUnitStateTableView src, ServiceUnitStateTableView dst) throws ExecutionException, InterruptedException, TimeoutException {
        int opTimeout = this.pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds();
        ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>();
        Iterator<Map.Entry<String, ServiceUnitStateData>> srcIter = src.entrySet().iterator();
        while (srcIter.hasNext()) {
            Map.Entry<String, ServiceUnitStateData> e = srcIter.next();
            futures.add(dst.put(e.getKey(), e.getValue()));
            if (futures.size() != 100 && srcIter.hasNext()) continue;
            FutureUtil.waitForAll(futures).get(opTimeout, TimeUnit.SECONDS);
        }
    }

    private void clean(ServiceUnitStateTableView dst) throws ExecutionException, InterruptedException, TimeoutException {
        int opTimeout = this.pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds();
        Iterator<Map.Entry<String, ServiceUnitStateData>> dstIter = dst.entrySet().iterator();
        ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>();
        while (dstIter.hasNext()) {
            Map.Entry<String, ServiceUnitStateData> e = dstIter.next();
            futures.add(dst.delete(e.getKey()));
            if (futures.size() != 100 && dstIter.hasNext()) continue;
            FutureUtil.waitForAll(futures).get(opTimeout, TimeUnit.SECONDS);
        }
    }

    private boolean waitUntilSynced(ServiceUnitStateTableView srt, ServiceUnitStateTableView dst, long started) throws InterruptedException {
        while (srt.entrySet().size() != dst.entrySet().size()) {
            if (TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - started) > 300L) {
                return false;
            }
            Thread.sleep(100L);
        }
        return true;
    }

    @Override
    public void close() throws IOException {
        if (!this.isActive) {
            return;
        }
        if (!ExtensibleLoadManagerImpl.configureSystemTopics(this.pulsar, 0x500000L)) {
            throw new IllegalStateException("Failed to enable compaction");
        }
        try {
            if (this.systemTopicTableView != null) {
                this.systemTopicTableView.close();
                this.systemTopicTableView = null;
                log.info("Closed SystemTopicTableView");
            }
        }
        catch (Exception e) {
            log.error("Failed to close SystemTopicTableView", (Throwable)e);
            throw e;
        }
        try {
            if (this.metadataStoreTableView != null) {
                this.metadataStoreTableView.close();
                this.metadataStoreTableView = null;
                log.info("Closed MetadataStoreTableView");
            }
        }
        catch (Exception e) {
            log.error("Failed to close MetadataStoreTableView", (Throwable)e);
            throw e;
        }
        log.info("Successfully closed ServiceUnitStateTableViewSyncer.");
        this.isActive = false;
    }

    public boolean isActive() {
        return this.isActive;
    }
}

