package org.apache.hama.monitor.fd;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.ArrayDeque;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.math3.distribution.NormalDistribution;
import org.apache.hama.HamaConfiguration;

/* loaded from: input_file:org/apache/hama/monitor/fd/UDPSupervisor.class */
public class UDPSupervisor implements Supervisor, Callable<Object> {
    public static final Log LOG = LogFactory.getLog(UDPSupervisor.class);
    private static final AtomicInteger WINDOW_SIZE = new AtomicInteger(100);
    private final ExecutorService receiver;
    private final ExecutorService supervisor;
    private final ScheduledExecutorService watcher;
    private final DatagramChannel channel;
    private final List<Node> nodes = new CopyOnWriteArrayList();
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final List<NodeEventListener> listeners = new CopyOnWriteArrayList();

    /* loaded from: input_file:org/apache/hama/monitor/fd/UDPSupervisor$Calculator.class */
    final class Calculator implements Callable<Boolean> {
        final Log LOG1 = LogFactory.getLog(Calculator.class);
        final Node target;

        Calculator(Node node) {
            this.target = node;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Boolean call() throws Exception {
            double phi = this.target.phi(System.currentTimeMillis());
            if (this.LOG1.isDebugEnabled()) {
                this.LOG1.debug(this.target.getHost() + "'s phi value is " + phi);
            }
            boolean z = !Double.isInfinite(phi);
            if (this.LOG1.isDebugEnabled()) {
                this.LOG1.debug(this.target.getHost() + " is alive? " + z);
            }
            return Boolean.valueOf(z);
        }
    }

    /* loaded from: input_file:org/apache/hama/monitor/fd/UDPSupervisor$Hermes.class */
    final class Hermes implements Callable<Object> {
        private final Node node;
        private final long heartbeat;
        private final long sequence;

        public Hermes(Node node, long j, long j2) {
            int indexOf = UDPSupervisor.this.nodes.indexOf(node);
            Node node2 = null;
            if (1 == j) {
                if (-1 == indexOf) {
                    node2 = node;
                    UDPSupervisor.this.nodes.add(node2);
                } else {
                    node2 = (Node) UDPSupervisor.this.nodes.get(indexOf);
                    node2.reset();
                }
            } else if (-1 == indexOf) {
                UDPSupervisor.LOG.warn("Non existing host (" + node.getHost() + ") is sending heartbeat sequence " + j + "!!!");
            } else {
                node2 = (Node) UDPSupervisor.this.nodes.get(indexOf);
            }
            this.node = node2;
            if (null == this.node) {
                throw new NullPointerException("Node is not correctly assigned.");
            }
            this.heartbeat = j2;
            this.sequence = j;
        }

        @Override // java.util.concurrent.Callable
        public Object call() throws Exception {
            this.node.add(this.heartbeat);
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hama/monitor/fd/UDPSupervisor$Node.class */
    public final class Node {
        final String host;
        final AtomicReference<Pair> pair;
        final int windowSize;

        Node(String str, int i) {
            this.pair = new AtomicReference<>(new Pair());
            this.host = str;
            this.windowSize = i;
            setSamplingWindow(new ArrayDeque<>(windowSize()));
        }

        final String getHost() {
            return this.host;
        }

        final void setLatestHeartbeat(long j) {
            this.pair.get().latestHeartbeat = j;
        }

        final long getLatestHeartbeat() {
            return this.pair.get().latestHeartbeat;
        }

        final ArrayDeque<Double> getSamplingWindow() {
            return this.pair.get().samplingWindow;
        }

        final void setSamplingWindow(ArrayDeque<Double> arrayDeque) {
            this.pair.get().samplingWindow = arrayDeque;
        }

        public void reset() {
            getSamplingWindow().clear();
            setLatestHeartbeat(0L);
        }

        final int windowSize() {
            return this.windowSize;
        }

        final Double[] samples() {
            return (Double[]) getSamplingWindow().toArray(new Double[getSamplingWindow().size()]);
        }

        public final void add(long j) {
            if (null == this.pair.get().samplingWindow) {
                throw new NullPointerException("Sampling windows not exist.");
            }
            if (0 != getLatestHeartbeat()) {
                if (getSamplingWindow().size() == windowSize()) {
                    getSamplingWindow().remove();
                }
                getSamplingWindow().add(new Double(j - getLatestHeartbeat()));
            }
            setLatestHeartbeat(j);
        }

        final double cdf(long j, Double[] dArr) {
            double cumulativeProbability = new NormalDistribution(mean(dArr), variance(dArr)).cumulativeProbability(j - getLatestHeartbeat());
            if (UDPSupervisor.LOG.isDebugEnabled()) {
                UDPSupervisor.LOG.debug("Calcuated cdf:" + cumulativeProbability + " END");
            }
            return cumulativeProbability;
        }

        public final double phi(long j) {
            return (-1.0d) * Math.log10(1.0d - cdf(j, samples()));
        }

        final double mean(Double[] dArr) {
            int length = dArr.length;
            if (0 >= length) {
                throw new RuntimeException("Samples data does not exist.");
            }
            double d = 0.0d;
            for (Double d2 : dArr) {
                d += d2.doubleValue();
            }
            return d / length;
        }

        final double variance(Double[] dArr) {
            int length = dArr.length;
            double mean = mean(dArr);
            double d = 0.0d;
            for (Double d2 : dArr) {
                double doubleValue = d2.doubleValue() - mean;
                d += doubleValue * doubleValue;
            }
            return d / length;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            return null != obj && getClass() == obj.getClass() && getHost().equals(((Node) obj).host);
        }

        public int hashCode() {
            return (37 * 17) + this.host.hashCode();
        }

        public String toString() {
            Double[] samples = samples();
            StringBuilder sb = new StringBuilder();
            for (Double d : samples) {
                sb.append(" " + d.doubleValue() + " ");
            }
            return "Node host:" + this.host + " mean:" + mean(samples) + " variance:" + variance(samples) + " samples:[" + sb.toString() + "]";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hama/monitor/fd/UDPSupervisor$Pair.class */
    public final class Pair {
        ArrayDeque<Double> samplingWindow;
        long latestHeartbeat;

        private Pair() {
        }
    }

    /* loaded from: input_file:org/apache/hama/monitor/fd/UDPSupervisor$Watcher.class */
    final class Watcher implements Runnable {
        final Log LOG2 = LogFactory.getLog(Watcher.class);
        final ExecutorService calculator = Executors.newSingleThreadExecutor();

        Watcher() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (!UDPSupervisor.this.listeners.isEmpty()) {
                    for (Node node : UDPSupervisor.this.nodes) {
                        String host = node.getHost();
                        for (NodeEventListener nodeEventListener : UDPSupervisor.this.listeners) {
                            for (NodeStatus nodeStatus : nodeEventListener.interest()) {
                                if (NodeStatus.Dead.equals(nodeStatus) && !((Boolean) this.calculator.submit(new Calculator(node)).get()).booleanValue()) {
                                    nodeEventListener.notify(nodeStatus, host);
                                }
                            }
                        }
                    }
                }
            } catch (InterruptedException e) {
                this.LOG2.warn("Calculator thread is interrupted.", e);
                Thread.currentThread().interrupt();
            } catch (ExecutionException e2) {
                this.LOG2.warn(e2);
            }
        }
    }

    public UDPSupervisor(HamaConfiguration hamaConfiguration) {
        DatagramChannel datagramChannel = null;
        try {
            datagramChannel = DatagramChannel.open();
        } catch (IOException e) {
            LOG.error("Fail to open udp channel.", e);
        }
        this.channel = datagramChannel;
        if (null == this.channel) {
            throw new NullPointerException("Channel can not be opened.");
        }
        try {
            this.channel.socket().bind(new InetSocketAddress(hamaConfiguration.getInt("bsp.monitor.fd.udp_port", 16384)));
        } catch (SocketException e2) {
            LOG.error("Unable to bind the udp socket.", e2);
        }
        WINDOW_SIZE.set(hamaConfiguration.getInt("bsp.monitor.fd.window_size", 100));
        this.receiver = Executors.newCachedThreadPool();
        this.supervisor = Executors.newSingleThreadExecutor();
        this.watcher = Executors.newSingleThreadScheduledExecutor();
    }

    @Override // org.apache.hama.monitor.fd.Supervisor
    public void register(NodeEventListener nodeEventListener) {
        this.listeners.add(nodeEventListener);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Node event listener " + nodeEventListener.name() + " is registered.");
        }
    }

    @Override // java.util.concurrent.Callable
    public Object call() throws Exception {
        ByteBuffer allocate = ByteBuffer.allocate(8);
        while (this.running.get()) {
            try {
                try {
                    String hostName = ((InetSocketAddress) this.channel.receive(allocate)).getHostName();
                    allocate.flip();
                    long j = allocate.getLong();
                    allocate.clear();
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Seqence: " + j + " src host: " + hostName);
                    }
                    this.receiver.submit(new Hermes(new Node(hostName, WINDOW_SIZE.get()), j, System.currentTimeMillis()));
                } catch (IOException e) {
                    LOG.error("Problem in receiving packet from channel.", e);
                    Thread.currentThread().interrupt();
                    if (null == this.channel) {
                        return null;
                    }
                    try {
                        this.channel.socket().close();
                        this.channel.close();
                        return null;
                    } catch (IOException e2) {
                        LOG.error("Error closing supervisor channel.", e2);
                        return null;
                    }
                }
            } catch (Throwable th) {
                if (null != this.channel) {
                    try {
                        this.channel.socket().close();
                        this.channel.close();
                    } catch (IOException e3) {
                        LOG.error("Error closing supervisor channel.", e3);
                    }
                }
                throw th;
            }
        }
        if (null == this.channel) {
            return null;
        }
        try {
            this.channel.socket().close();
            this.channel.close();
            return null;
        } catch (IOException e4) {
            LOG.error("Error closing supervisor channel.", e4);
            return null;
        }
    }

    @Override // org.apache.hama.monitor.fd.Supervisor
    public void start() {
        if (!this.running.compareAndSet(false, true)) {
            throw new IllegalStateException("Supervisor is already started.");
        }
        this.supervisor.submit(this);
        this.watcher.scheduleAtFixedRate(new Watcher(), 0L, 1L, TimeUnit.SECONDS);
    }

    @Override // org.apache.hama.monitor.fd.Supervisor
    public void stop() {
        this.running.set(false);
        this.watcher.shutdown();
        this.receiver.shutdown();
        this.supervisor.shutdown();
    }

    public boolean isShutdown() {
        return this.channel.socket().isClosed() && !this.running.get();
    }
}
