/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.resourcegroup;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupNamespaceConfigListener;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupService;
import org.apache.pulsar.broker.resources.ResourceGroupResources;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.policies.data.ResourceGroup;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Runnables;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResourceGroupConfigListener
implements Consumer<Notification> {
    private static final Logger LOG = LoggerFactory.getLogger(ResourceGroupConfigListener.class);
    private final ResourceGroupService rgService;
    private final PulsarService pulsarService;
    private final ResourceGroupResources rgResources;
    private volatile ResourceGroupNamespaceConfigListener rgNamespaceConfigListener;

    public ResourceGroupConfigListener(ResourceGroupService rgService, PulsarService pulsarService) {
        this.rgService = rgService;
        this.pulsarService = pulsarService;
        this.rgResources = pulsarService.getPulsarResources().getResourcegroupResources();
        this.rgResources.getStore().registerListener((Consumer)this);
        this.execute(() -> this.loadAllResourceGroupsWithRetryAsync(0L));
    }

    private void loadAllResourceGroupsWithRetryAsync(long retry) {
        ((CompletableFuture)this.loadAllResourceGroupsAsync().thenAccept(__ -> {
            if (this.rgNamespaceConfigListener == null) {
                this.rgNamespaceConfigListener = new ResourceGroupNamespaceConfigListener(this.rgService, this.pulsarService, this);
            }
        })).exceptionally(e -> {
            long nextRetry = retry + 1L;
            long delay = 500L * nextRetry;
            LOG.error("Failed to load all resource groups during initialization, retrying after {}ms: ", (Object)delay, e);
            this.schedule(() -> this.loadAllResourceGroupsWithRetryAsync(nextRetry), delay);
            return null;
        });
    }

    private CompletableFuture<Void> loadAllResourceGroupsAsync() {
        return this.rgResources.listResourceGroupsAsync().thenCompose(rgList -> {
            Set<String> existingSet = this.rgService.resourceGroupGetAll();
            HashSet newSet = new HashSet();
            newSet.addAll(rgList);
            Sets.SetView deleteList = Sets.difference(existingSet, newSet);
            for (String rgName : deleteList) {
                this.deleteResourceGroup(rgName);
            }
            Sets.SetView addList = Sets.difference(newSet, existingSet);
            ArrayList<CompletionStage> futures = new ArrayList<CompletionStage>();
            for (String rgName : addList) {
                futures.add(this.pulsarService.getPulsarResources().getResourcegroupResources().getResourceGroupAsync(rgName).thenAccept(optionalRg -> {
                    if (optionalRg.isPresent()) {
                        ResourceGroup rg = (ResourceGroup)optionalRg.get();
                        this.createResourceGroup(rgName, rg);
                    }
                }));
            }
            return FutureUtil.waitForAll(futures);
        });
    }

    public synchronized void deleteResourceGroup(String rgName) {
        try {
            if (this.rgService.resourceGroupGet(rgName) != null) {
                LOG.info("Deleting resource group {}", (Object)rgName);
                this.rgService.resourceGroupDelete(rgName);
            }
        }
        catch (PulsarAdminException e) {
            LOG.error("Got exception while deleting resource group {}, {}", (Object)rgName, (Object)e);
        }
    }

    public synchronized void createResourceGroup(String rgName, ResourceGroup rg) {
        if (this.rgService.resourceGroupGet(rgName) == null) {
            LOG.info("Creating resource group {}, {}", (Object)rgName, (Object)rg.toString());
            try {
                this.rgService.resourceGroupCreate(rgName, rg);
            }
            catch (PulsarAdminException ex1) {
                LOG.error("Got an exception while creating RG {}", (Object)rgName, (Object)ex1);
            }
        }
    }

    private void updateResourceGroup(String rgName) {
        this.rgResources.getResourceGroupAsync(rgName).whenComplete((optionalRg, ex) -> {
            if (ex != null) {
                LOG.error("Exception when getting resource group {}", (Object)rgName, ex);
                return;
            }
            ResourceGroup rg = (ResourceGroup)optionalRg.get();
            try {
                LOG.info("Updating resource group {}, {}", (Object)rgName, (Object)rg);
                this.rgService.resourceGroupUpdate(rgName, rg);
            }
            catch (PulsarAdminException ex1) {
                LOG.error("Got an exception while creating resource group {}", (Object)rgName, (Object)ex1);
            }
        });
    }

    @Override
    public void accept(Notification notification) {
        String notifyPath = notification.getPath();
        if (!ResourceGroupResources.isResourceGroupPath((String)notifyPath)) {
            return;
        }
        LOG.info("Metadata store notification: Path {}, Type {}", (Object)notifyPath, (Object)notification.getType());
        Optional rgName = ResourceGroupResources.resourceGroupNameFromPath((String)notifyPath);
        if (notification.getType() == NotificationType.ChildrenChanged || notification.getType() == NotificationType.Created) {
            this.loadAllResourceGroupsAsync().exceptionally(ex -> {
                LOG.error("Exception when fetching resource groups", ex);
                return null;
            });
        } else if (rgName.isPresent()) {
            switch (notification.getType()) {
                case Modified: {
                    this.updateResourceGroup((String)rgName.get());
                    break;
                }
            }
        }
    }

    protected void execute(Runnable runnable) {
        this.pulsarService.getExecutor().execute(Runnables.catchingAndLoggingThrowables((Runnable)runnable));
    }

    protected void schedule(Runnable runnable, long delayMs) {
        this.pulsarService.getExecutor().schedule(Runnables.catchingAndLoggingThrowables((Runnable)runnable), delayMs, TimeUnit.MILLISECONDS);
    }

    @VisibleForTesting
    ResourceGroupNamespaceConfigListener getRgNamespaceConfigListener() {
        return this.rgNamespaceConfigListener;
    }
}

