package org.apache.hama.monitor.fd;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;

/* loaded from: input_file:org/apache/hama/monitor/fd/UDPSensor.class */
public class UDPSensor implements Sensor, Callable<Object> {
    public static final Log LOG = LogFactory.getLog(UDPSensor.class);
    private static long HEARTBEAT_INTERVAL;
    private String host;
    private int port;
    private final DatagramChannel channel;
    private AtomicBoolean running = new AtomicBoolean(false);
    private AtomicLong sequence = new AtomicLong(0);
    private final ExecutorService scheduler;

    public UDPSensor(HamaConfiguration hamaConfiguration) {
        this.host = hamaConfiguration.get("bsp.monitor.fd.udp_host", "localhost");
        this.port = hamaConfiguration.getInt("bsp.monitor.fd.udp_port", Constants.BUFFER_DEFAULT_SIZE);
        HEARTBEAT_INTERVAL = hamaConfiguration.getInt("bsp.monitor.fd.heartbeat_interval", 1000);
        DatagramChannel datagramChannel = null;
        try {
            datagramChannel = DatagramChannel.open();
        } catch (IOException e) {
            LOG.error("Unable to open datagram channel.", e);
        }
        this.channel = datagramChannel;
        if (null == this.channel) {
            throw new NullPointerException("Fail to open udp channel.");
        }
        this.scheduler = Executors.newSingleThreadExecutor();
    }

    @Override // org.apache.hama.monitor.fd.Sensor
    public void heartbeat() throws IOException {
        ByteBuffer allocate = ByteBuffer.allocate(8);
        allocate.clear();
        allocate.putLong(this.sequence.incrementAndGet());
        allocate.flip();
        this.channel.send(allocate, new InetSocketAddress(this.host, this.port));
        if (LOG.isDebugEnabled()) {
            LOG.debug("Heartbeat sequence " + this.sequence.get() + " is sent to " + this.host + ":" + this.port);
        }
    }

    public String getHost() {
        return this.host;
    }

    public int getPort() {
        return this.port;
    }

    public long heartbeatInterval() {
        return HEARTBEAT_INTERVAL;
    }

    @Override // java.util.concurrent.Callable
    public Object call() throws Exception {
        while (this.running.get()) {
            try {
                heartbeat();
                Thread.sleep(HEARTBEAT_INTERVAL);
            } catch (IOException e) {
                LOG.error("Sensor fails in sending heartbeat.", e);
            } catch (InterruptedException e2) {
                LOG.error("UDPSensor is interrupted.", e2);
                Thread.currentThread().interrupt();
            }
        }
        LOG.info("Sensor at " + this.host + " stops sending heartbeat.");
        return null;
    }

    @Override // org.apache.hama.monitor.fd.Sensor
    public void start() {
        if (!this.running.compareAndSet(false, true)) {
            throw new IllegalStateException("Sensor is already started.");
        }
        this.scheduler.submit(this);
    }

    @Override // org.apache.hama.monitor.fd.Sensor
    public void stop() {
        this.running.set(false);
        if (null != this.channel) {
            try {
                this.channel.socket().close();
                this.channel.close();
            } catch (IOException e) {
                LOG.error("Error closing sensor channel.", e);
            }
        }
        this.scheduler.shutdown();
    }

    public boolean isShutdown() {
        return this.channel.socket().isClosed() && !this.running.get();
    }
}
