package io.cdap.cdap.common.zookeeper.coordination;

import com.google.common.base.Functions;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultimap;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Service;
import io.cdap.cdap.api.common.Bytes;
import io.cdap.cdap.common.zookeeper.ZKExtOperations;
import io.cdap.cdap.common.zookeeper.coordination.ResourceRequirement;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.discovery.Discoverable;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.apache.twill.discovery.ServiceDiscovered;
import org.apache.twill.zookeeper.NodeChildren;
import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/cdap/cdap/common/zookeeper/coordination/ResourceCoordinator.class */
public final class ResourceCoordinator extends AbstractService {
    private static final Logger LOG = LoggerFactory.getLogger(ResourceCoordinator.class);
    private final ZKClient zkClient;
    private final DiscoveryServiceClient discoveryService;
    private final AssignmentStrategy assignmentStrategy;
    private final Map<String, ResourceRequirement> requirements = new HashMap();
    private final Map<String, ResourceAssignment> assignments = new ConcurrentHashMap();
    private final Map<String, CancellableServiceDiscovered> serviceDiscovered = new HashMap();
    private final DiscoverableChangeListener discoverableListener = new DiscoverableChangeListener();
    private ExecutorService executor;

    /* renamed from: io.cdap.cdap.common.zookeeper.coordination.ResourceCoordinator$9, reason: invalid class name */
    /* loaded from: input_file:io/cdap/cdap/common/zookeeper/coordination/ResourceCoordinator$9.class */
    static /* synthetic */ class AnonymousClass9 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$zookeeper$Watcher$Event$EventType = new int[Watcher.Event.EventType.values().length];

        static {
            try {
                $SwitchMap$org$apache$zookeeper$Watcher$Event$EventType[Watcher.Event.EventType.NodeCreated.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$zookeeper$Watcher$Event$EventType[Watcher.Event.EventType.NodeChildrenChanged.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$zookeeper$Watcher$Event$EventType[Watcher.Event.EventType.NodeDeleted.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/cdap/cdap/common/zookeeper/coordination/ResourceCoordinator$CancellableServiceDiscovered.class */
    public static final class CancellableServiceDiscovered implements Cancellable {
        private final Cancellable cancellable;
        private final ServiceDiscovered serviceDiscovered;

        private CancellableServiceDiscovered(ServiceDiscovered serviceDiscovered, ServiceDiscovered.ChangeListener changeListener, Executor executor) {
            this.cancellable = serviceDiscovered.watchChanges(changeListener, executor);
            this.serviceDiscovered = serviceDiscovered;
        }

        @Override // org.apache.twill.common.Cancellable
        public void cancel() {
            this.cancellable.cancel();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/cdap/cdap/common/zookeeper/coordination/ResourceCoordinator$DiscoverableChangeListener.class */
    public final class DiscoverableChangeListener implements ServiceDiscovered.ChangeListener {
        private DiscoverableChangeListener() {
        }

        public void onChange(ServiceDiscovered serviceDiscovered) {
            ResourceRequirement resourceRequirement = (ResourceRequirement) ResourceCoordinator.this.requirements.get(serviceDiscovered.getName());
            if (resourceRequirement != null) {
                ResourceCoordinator.this.performAssignment(resourceRequirement, serviceDiscovered);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/cdap/cdap/common/zookeeper/coordination/ResourceCoordinator$ResourceRequirementWatcher.class */
    public final class ResourceRequirementWatcher implements Watcher {
        private final String path;

        private ResourceRequirementWatcher(String str) {
            this.path = str;
        }

        public void process(WatchedEvent watchedEvent) {
            if (ResourceCoordinator.this.shouldProcess() && watchedEvent.getType() == Watcher.Event.EventType.NodeDataChanged) {
                ResourceCoordinator.this.fetchAndProcessRequirement(this.path, this);
            }
        }
    }

    /* loaded from: input_file:io/cdap/cdap/common/zookeeper/coordination/ResourceCoordinator$ResourceWatcher.class */
    private final class ResourceWatcher implements Watcher {
        private ResourceWatcher() {
        }

        public void process(WatchedEvent watchedEvent) {
            if (ResourceCoordinator.this.shouldProcess()) {
                switch (AnonymousClass9.$SwitchMap$org$apache$zookeeper$Watcher$Event$EventType[watchedEvent.getType().ordinal()]) {
                    case 1:
                    case 2:
                        ResourceCoordinator.this.fetchAndProcessAllResources(this);
                        return;
                    case 3:
                        ResourceCoordinator.this.beginWatch(this);
                        return;
                    default:
                        return;
                }
            }
        }
    }

    public ResourceCoordinator(ZKClient zKClient, DiscoveryServiceClient discoveryServiceClient, AssignmentStrategy assignmentStrategy) {
        this.zkClient = zKClient;
        this.discoveryService = discoveryServiceClient;
        this.assignmentStrategy = assignmentStrategy;
    }

    @Override // com.google.common.util.concurrent.AbstractService
    protected void doStart() {
        this.executor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("resource-coordinator"));
        beginWatch(wrapWatcher(new ResourceWatcher()));
        notifyStarted();
    }

    @Override // com.google.common.util.concurrent.AbstractService
    protected void doStop() {
        try {
            this.executor.execute(createShutdownTask(null));
        } finally {
            this.executor.shutdown();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doNotifyFailed(Throwable th) {
        try {
            this.executor.execute(createShutdownTask(th));
        } finally {
            this.executor.shutdown();
        }
    }

    private Runnable createShutdownTask(final Throwable th) {
        return new Runnable() { // from class: io.cdap.cdap.common.zookeeper.coordination.ResourceCoordinator.1
            @Override // java.lang.Runnable
            public void run() {
                Iterator it = ResourceCoordinator.this.serviceDiscovered.values().iterator();
                while (it.hasNext()) {
                    try {
                        ((Cancellable) it.next()).cancel();
                    } catch (Throwable th2) {
                        ResourceCoordinator.LOG.warn("Exception when cancelling service discovery listener.", th2);
                    }
                }
                if (th == null) {
                    ResourceCoordinator.this.notifyStopped();
                } else {
                    ResourceCoordinator.this.notifyFailed(th);
                }
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void beginWatch(final Watcher watcher) {
        Futures.addCallback(this.zkClient.exists("/requirements", watcher), wrapCallback(new FutureCallback<Stat>() { // from class: io.cdap.cdap.common.zookeeper.coordination.ResourceCoordinator.2
            @Override // com.google.common.util.concurrent.FutureCallback
            public void onSuccess(Stat stat) {
                if (stat != null) {
                    ResourceCoordinator.this.fetchAndProcessAllResources(watcher);
                }
            }

            @Override // com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                ResourceCoordinator.LOG.error("Failed to call exists on ZK node {}{}", ResourceCoordinator.this.zkClient.getConnectString(), "/requirements", th);
                ResourceCoordinator.this.doNotifyFailed(th);
            }
        }), this.executor);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void fetchAndProcessAllResources(final Watcher watcher) {
        Futures.addCallback(this.zkClient.getChildren("/requirements", watcher), wrapCallback(new FutureCallback<NodeChildren>() { // from class: io.cdap.cdap.common.zookeeper.coordination.ResourceCoordinator.3
            /* JADX WARN: Multi-variable type inference failed */
            @Override // com.google.common.util.concurrent.FutureCallback
            public void onSuccess(NodeChildren nodeChildren) {
                ImmutableSet copyOf = ImmutableSet.copyOf((Collection) nodeChildren.getChildren());
                Iterator<E> it = copyOf.iterator();
                while (it.hasNext()) {
                    String str = "/requirements/" + ((String) it.next());
                    ResourceCoordinator.this.fetchAndProcessRequirement(str, ResourceCoordinator.this.wrapWatcher(new ResourceRequirementWatcher(str)));
                }
                UnmodifiableIterator it2 = ImmutableSet.copyOf((Collection) Sets.difference(ResourceCoordinator.this.requirements.keySet(), copyOf)).iterator();
                while (it2.hasNext()) {
                    String str2 = (String) it2.next();
                    ResourceCoordinator.LOG.info("Requirement deleted {}", (ResourceRequirement) ResourceCoordinator.this.requirements.remove(str2));
                    ResourceCoordinator.this.removeAssignment(str2);
                }
            }

            @Override // com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                if (th instanceof KeeperException.NoNodeException) {
                    ResourceCoordinator.this.beginWatch(watcher);
                }
                ResourceCoordinator.LOG.error("Failed to getChildren on ZK node {}{}", ResourceCoordinator.this.zkClient.getConnectString(), "/requirements", th);
                ResourceCoordinator.this.doNotifyFailed(th);
            }
        }), this.executor);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void fetchAndProcessRequirement(final String str, Watcher watcher) {
        Futures.addCallback(this.zkClient.getData(str, watcher), wrapCallback(new FutureCallback<NodeData>() { // from class: io.cdap.cdap.common.zookeeper.coordination.ResourceCoordinator.4
            @Override // com.google.common.util.concurrent.FutureCallback
            public void onSuccess(NodeData nodeData) {
                byte[] data = nodeData.getData();
                if (data == null) {
                    ResourceCoordinator.LOG.warn("Ignore empty data in ZK node {}{}", ResourceCoordinator.this.zkClient.getConnectString(), str);
                    return;
                }
                try {
                    ResourceRequirement decode = CoordinationConstants.RESOURCE_REQUIREMENT_CODEC.decode(data);
                    ResourceCoordinator.LOG.info("Get requirement {}", decode);
                    ResourceRequirement resourceRequirement = (ResourceRequirement) ResourceCoordinator.this.requirements.get(decode.getName());
                    if (decode.equals(resourceRequirement)) {
                        ResourceCoordinator.LOG.info("Requirement for {} is not changed. No assignment is needed. {} = {}", decode.getName(), resourceRequirement, decode);
                        return;
                    }
                    ResourceCoordinator.this.requirements.put(decode.getName(), decode);
                    CancellableServiceDiscovered cancellableServiceDiscovered = (CancellableServiceDiscovered) ResourceCoordinator.this.serviceDiscovered.get(decode.getName());
                    if (cancellableServiceDiscovered == null) {
                        ResourceCoordinator.this.serviceDiscovered.put(decode.getName(), new CancellableServiceDiscovered(ResourceCoordinator.this.discoveryService.discover(decode.getName()), ResourceCoordinator.this.discoverableListener, ResourceCoordinator.this.executor));
                    } else {
                        ResourceCoordinator.this.performAssignment(decode, cancellableServiceDiscovered.serviceDiscovered);
                    }
                } catch (Exception e) {
                    ResourceCoordinator.LOG.warn("Failed to process requirement ZK node {}{}: {}", ResourceCoordinator.this.zkClient.getConnectString(), str, Bytes.toStringBinary(data), e);
                }
            }

            @Override // com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                ResourceCoordinator.LOG.error("Failed to getData on ZK node {}{}", ResourceCoordinator.this.zkClient.getConnectString(), str, th);
            }
        }), this.executor);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void performAssignment(ResourceRequirement resourceRequirement, ServiceDiscovered serviceDiscovered) {
        ResourceAssignment resourceAssignment = this.assignments.get(resourceRequirement.getName());
        if (resourceAssignment == null) {
            fetchAndPerformAssignment(resourceRequirement, serviceDiscovered);
            return;
        }
        ImmutableSortedSet copyOf = ImmutableSortedSet.copyOf((Comparator) DiscoverableComparator.COMPARATOR, (Iterable) serviceDiscovered);
        LOG.info("Perform assign for requirement {}. Number of handlers is {}", resourceRequirement, Integer.valueOf(copyOf.size()));
        HashMap newHashMap = Maps.newHashMap();
        for (ResourceRequirement.Partition partition : resourceRequirement.getPartitions()) {
            newHashMap.put(partition.getName(), Integer.valueOf(partition.getReplicas()));
        }
        TreeMultimap create = TreeMultimap.create(DiscoverableComparator.COMPARATOR, PartitionReplica.COMPARATOR);
        for (Map.Entry<Discoverable, PartitionReplica> entry : resourceAssignment.getAssignments().entries()) {
            Integer num = (Integer) newHashMap.get(entry.getValue().getName());
            if (num != null && entry.getValue().getReplicaId() < num.intValue() && copyOf.contains(entry.getKey())) {
                create.put(entry.getKey(), entry.getValue());
            }
        }
        ResourceAssigner create2 = DefaultResourceAssigner.create(create);
        if (!copyOf.isEmpty() && !newHashMap.isEmpty()) {
            this.assignmentStrategy.assign(resourceRequirement, copyOf, create2);
        }
        saveAssignment(new ResourceAssignment(resourceRequirement.getName(), create2.get()));
    }

    private void fetchAndPerformAssignment(final ResourceRequirement resourceRequirement, final ServiceDiscovered serviceDiscovered) {
        final String name = resourceRequirement.getName();
        Futures.addCallback(this.zkClient.getData("/assignments/" + name), new FutureCallback<NodeData>() { // from class: io.cdap.cdap.common.zookeeper.coordination.ResourceCoordinator.5
            @Override // com.google.common.util.concurrent.FutureCallback
            public void onSuccess(NodeData nodeData) {
                if (ResourceCoordinator.this.assignments.get(name) != null) {
                    return;
                }
                byte[] data = nodeData.getData();
                ResourceAssignment resourceAssignment = new ResourceAssignment(name);
                if (data != null) {
                    try {
                        resourceAssignment = CoordinationConstants.RESOURCE_ASSIGNMENT_CODEC.decode(data);
                    } catch (Throwable th) {
                        ResourceCoordinator.LOG.warn("Failed to decode resource assignment. Perform assignment as if no assignment existed.", th);
                    }
                }
                ResourceCoordinator.this.assignments.put(name, resourceAssignment);
                ResourceCoordinator.this.performAssignment(resourceRequirement, serviceDiscovered);
            }

            @Override // com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                if (!(th instanceof KeeperException.NoNodeException)) {
                    ResourceCoordinator.LOG.warn("Failed to fetch current assignment. Perform assignment as if no assignment existed.", th);
                }
                ResourceCoordinator.this.assignments.put(name, new ResourceAssignment(name));
                ResourceCoordinator.this.performAssignment(resourceRequirement, serviceDiscovered);
            }
        }, this.executor);
    }

    private void saveAssignment(ResourceAssignment resourceAssignment) {
        String name = resourceAssignment.getName();
        this.assignments.put(name, resourceAssignment);
        try {
            final byte[] encode = CoordinationConstants.RESOURCE_ASSIGNMENT_CODEC.encode(resourceAssignment);
            Futures.addCallback(ZKExtOperations.setOrCreate(this.zkClient, "/assignments/" + name, Suppliers.compose(Functions.forMap(this.assignments), Suppliers.ofInstance(name)), CoordinationConstants.RESOURCE_ASSIGNMENT_CODEC, 10), new FutureCallback<ResourceAssignment>() { // from class: io.cdap.cdap.common.zookeeper.coordination.ResourceCoordinator.6
                @Override // com.google.common.util.concurrent.FutureCallback
                public void onSuccess(ResourceAssignment resourceAssignment2) {
                    ResourceCoordinator.LOG.info("Resource assignment updated for {}. {}", resourceAssignment2.getName(), Bytes.toString(encode));
                }

                @Override // com.google.common.util.concurrent.FutureCallback
                public void onFailure(Throwable th) {
                    ResourceCoordinator.LOG.error("Failed to save assignment {}", Bytes.toStringBinary(encode), th);
                    ResourceCoordinator.this.doNotifyFailed(th);
                }
            }, this.executor);
        } catch (Exception e) {
            LOG.error("Failed to save assignment: {}", name, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeAssignment(String str) {
        this.assignments.remove(str);
        this.zkClient.delete("/assignments/" + str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean shouldProcess() {
        Service.State state = state();
        return state == Service.State.STARTING || state == Service.State.RUNNING;
    }

    private <T> FutureCallback<T> wrapCallback(final FutureCallback<T> futureCallback) {
        return new FutureCallback<T>() { // from class: io.cdap.cdap.common.zookeeper.coordination.ResourceCoordinator.7
            @Override // com.google.common.util.concurrent.FutureCallback
            public void onSuccess(T t) {
                if (ResourceCoordinator.this.shouldProcess()) {
                    futureCallback.onSuccess(t);
                }
            }

            @Override // com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                if (ResourceCoordinator.this.shouldProcess()) {
                    futureCallback.onFailure(th);
                }
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Watcher wrapWatcher(final Watcher watcher) {
        return new Watcher() { // from class: io.cdap.cdap.common.zookeeper.coordination.ResourceCoordinator.8
            public void process(WatchedEvent watchedEvent) {
                if (ResourceCoordinator.this.shouldProcess()) {
                    watcher.process(watchedEvent);
                }
            }
        };
    }
}
