package org.apache.hama.monitor;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.util.ZKUtil;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;

/* loaded from: input_file:org/apache/hama/monitor/Monitor.class */
public final class Monitor extends Thread implements MonitorListener {
    public static final Log LOG = LogFactory.getLog(Monitor.class);
    public static final String MONITOR_ROOT_PATH = "/monitor/";
    private final Map<String, TaskWorker> workers = new ConcurrentHashMap();
    private final BlockingQueue<Result> results = new LinkedBlockingQueue();
    private final Publisher publisher;
    private final Collector collector;
    private final Initializer initializer;
    private final Configuration configuration;
    private final ZooKeeper zookeeper;
    private final String groomServerName;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hama/monitor/Monitor$Collector.class */
    public static final class Collector extends Thread {
        final ExecutorService pool = Executors.newCachedThreadPool();
        final Configuration conf;
        final Map<String, TaskWorker> workers;

        Collector(Configuration configuration, Map<String, TaskWorker> map) {
            this.conf = configuration;
            this.workers = map;
            setName(getClass().getSimpleName());
            setDaemon(true);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                Thread.currentThread();
                while (!Thread.interrupted()) {
                    Monitor.LOG.debug("How many workers will be executed by collector? " + this.workers.size());
                    Iterator<TaskWorker> it = this.workers.values().iterator();
                    while (it.hasNext()) {
                        this.pool.submit(it.next());
                    }
                    Thread.sleep(this.conf.getInt("bsp.monitor.collector.period", 5) * 1000);
                }
            } catch (InterruptedException e) {
                this.pool.shutdown();
                Monitor.LOG.warn(getClass().getSimpleName() + " is interrupted.", e);
                Thread.currentThread().interrupt();
            }
        }
    }

    /* loaded from: input_file:org/apache/hama/monitor/Monitor$Destination.class */
    public enum Destination {
        ZK("zk"),
        HDFS("hdfs"),
        JMX("jmx");

        final String dest;

        Destination(String str) {
            this.dest = str;
        }

        public String value() {
            return this.dest;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hama/monitor/Monitor$Initializer.class */
    public static final class Initializer extends Thread {
        final Configuration conf;
        final Map<String, TaskWorker> workers;
        final MonitorListener listener;

        Initializer(Map<String, TaskWorker> map, Configuration configuration, MonitorListener monitorListener) {
            this.workers = map;
            this.conf = configuration;
            this.listener = monitorListener;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                Thread.currentThread();
                while (!Thread.interrupted()) {
                    Map<String, Task> configure = Configurator.configure((HamaConfiguration) this.conf, this.listener);
                    if (null != configure) {
                        for (Map.Entry<String, Task> entry : configure.entrySet()) {
                            String key = entry.getKey();
                            Task value = entry.getValue();
                            if (null != ((TaskWorker) ((ConcurrentMap) this.workers).putIfAbsent(key, new TaskWorker(value)))) {
                                ((ConcurrentMap) this.workers).replace(key, new TaskWorker(value));
                            }
                        }
                    }
                    Monitor.LOG.debug("Task worker list's size: " + this.workers.size());
                    Thread.sleep(this.conf.getInt("bsp.monitor.initializer.period", 5) * 1000);
                }
            } catch (IOException e) {
                Monitor.LOG.warn(getClass().getSimpleName() + " can not load jar file  from plugin directory.", e);
                Thread.currentThread().interrupt();
            } catch (InterruptedException e2) {
                Monitor.LOG.warn(getClass().getSimpleName() + " is interrupted.", e2);
                Thread.currentThread().interrupt();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hama/monitor/Monitor$Publisher.class */
    public static final class Publisher extends Thread {
        final Configuration conf;
        final BlockingQueue<Result> results;
        final ConcurrentMap<Destination, PublisherHandler> handlers = new ConcurrentHashMap();
        final ExecutorService pool = Executors.newCachedThreadPool();

        Publisher(Configuration configuration, BlockingQueue<Result> blockingQueue) {
            this.conf = configuration;
            this.results = blockingQueue;
            setName(getClass().getSimpleName());
            setDaemon(true);
        }

        void bind(Destination destination, PublisherHandler publisherHandler) {
            this.handlers.putIfAbsent(destination, publisherHandler);
        }

        PublisherHandler get(Destination destination) {
            return this.handlers.get(destination);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!Thread.interrupted()) {
                try {
                    final Result take = this.results.take();
                    this.pool.submit(new Callable<Object>() { // from class: org.apache.hama.monitor.Monitor.Publisher.1
                        @Override // java.util.concurrent.Callable
                        public Object call() throws Exception {
                            for (Destination destination : take.destinations()) {
                                Publisher.this.get(destination).handle(take);
                            }
                            return null;
                        }
                    });
                    Thread.sleep(this.conf.getInt("bsp.monitor.publisher.period", 5) * 1000);
                } catch (InterruptedException e) {
                    this.pool.shutdown();
                    Monitor.LOG.warn("Publisher is interrupted.", e);
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hama/monitor/Monitor$PublisherHandler.class */
    public interface PublisherHandler {
        void handle(Result result);
    }

    /* loaded from: input_file:org/apache/hama/monitor/Monitor$Result.class */
    public interface Result {
        String name();

        Destination[] destinations();

        Object get();
    }

    /* loaded from: input_file:org/apache/hama/monitor/Monitor$Task.class */
    public static abstract class Task {
        final String name;
        final AtomicReference<MonitorListener> listener = new AtomicReference<>();

        public Task(String str) {
            this.name = str;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public final void setListener(MonitorListener monitorListener) {
            this.listener.set(monitorListener);
        }

        public final MonitorListener getListener() {
            return this.listener.get();
        }

        public final String getName() {
            return this.name;
        }

        public abstract Object run() throws TaskException;
    }

    /* loaded from: input_file:org/apache/hama/monitor/Monitor$TaskException.class */
    public static class TaskException extends Exception {
        private static final long serialVersionUID = -8982174675196566540L;

        public TaskException() {
        }

        public TaskException(String str) {
            super(str);
        }

        public TaskException(Throwable th) {
            super(th);
        }
    }

    /* loaded from: input_file:org/apache/hama/monitor/Monitor$TaskWorker.class */
    static class TaskWorker implements Callable<Object> {
        final Task task;

        TaskWorker(Task task) {
            this.task = task;
        }

        @Override // java.util.concurrent.Callable
        public Object call() throws Exception {
            return this.task.run();
        }
    }

    /* loaded from: input_file:org/apache/hama/monitor/Monitor$ZKHandler.class */
    public static final class ZKHandler implements PublisherHandler {
        final ZooKeeper zk;
        final String groomServerName;

        public ZKHandler(ZooKeeper zooKeeper, String str) {
            this.zk = zooKeeper;
            this.groomServerName = str;
        }

        @Override // org.apache.hama.monitor.Monitor.PublisherHandler
        public void handle(Result result) {
            Object obj = result.get();
            if (!(obj instanceof MetricsRecord)) {
                Monitor.LOG.warn(ZKHandler.class.getSimpleName() + " don't know how to handle the result." + obj);
                return;
            }
            String str = Monitor.MONITOR_ROOT_PATH + this.groomServerName + "/metrics/" + result.name();
            ZKUtil.create(this.zk, str);
            for (Metric<?> metric : ((MetricsRecord) obj).metrics()) {
                String name = metric.name();
                Number number = (Number) metric.value();
                try {
                    if (null != this.zk.exists(str, false)) {
                        String suffix = suffix(number);
                        if (Monitor.LOG.isDebugEnabled()) {
                            Monitor.LOG.debug("Publish name [" + name + "] and value [" + number + "] to zk.");
                        }
                        String str2 = str + ZKUtil.ZK_SEPARATOR + name + suffix;
                        if (null == this.zk.exists(str2, false)) {
                            Monitor.LOG.debug("Publish data to zk with path to `" + this.zk.create(str2, toBytes(number), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) + "'");
                        } else {
                            this.zk.setData(str2, toBytes(number), -1);
                            Monitor.LOG.debug("Update data in znode: " + str);
                        }
                    }
                } catch (InterruptedException e) {
                    Monitor.LOG.warn(e);
                } catch (KeeperException e2) {
                    Monitor.LOG.warn(e2);
                }
            }
        }

        static String suffix(Number number) {
            return number instanceof Byte ? "_b" : number instanceof Double ? "_d" : number instanceof Float ? "_f" : number instanceof Integer ? "_i" : number instanceof Long ? "_l" : "_?";
        }

        static byte[] toBytes(Number number) {
            if (number instanceof Byte) {
                return new byte[]{number.byteValue()};
            }
            byte[] bArr = null;
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
            try {
                try {
                    if (number instanceof Double) {
                        dataOutputStream.writeDouble(number.doubleValue());
                    } else if (number instanceof Float) {
                        dataOutputStream.writeFloat(number.floatValue());
                    } else if (number instanceof Integer) {
                        dataOutputStream.writeInt(number.intValue());
                    } else if (number instanceof Short) {
                        dataOutputStream.writeShort(number.shortValue());
                    } else if (number instanceof Long) {
                        dataOutputStream.writeLong(number.longValue());
                    } else {
                        Monitor.LOG.warn("Unkown data type: " + number);
                    }
                    bArr = byteArrayOutputStream.toByteArray();
                    if (Monitor.LOG.isDebugEnabled()) {
                        Monitor.LOG.debug("bytes's length after value (" + number + ") is converted: " + (null != bArr ? bArr.length : 0));
                    }
                    try {
                        dataOutputStream.close();
                    } catch (IOException e) {
                        Monitor.LOG.warn("Fail closing output stream.", e);
                    }
                } catch (Throwable th) {
                    try {
                        dataOutputStream.close();
                    } catch (IOException e2) {
                        Monitor.LOG.warn("Fail closing output stream.", e2);
                    }
                    throw th;
                }
            } catch (IOException e3) {
                Monitor.LOG.warn("Fail writing data to output stream.", e3);
                try {
                    dataOutputStream.close();
                } catch (IOException e4) {
                    Monitor.LOG.warn("Fail closing output stream.", e4);
                }
            }
            return bArr;
        }
    }

    public Monitor(Configuration configuration, ZooKeeper zooKeeper, String str) {
        this.configuration = configuration;
        if (null == this.configuration) {
            throw new NullPointerException("No configuration is provided.");
        }
        this.zookeeper = zooKeeper;
        if (null == this.zookeeper) {
            throw new NullPointerException("ZooKeeper is not provided.");
        }
        this.groomServerName = str;
        if (null == this.groomServerName) {
            throw new NullPointerException("Groom server name is not provided.");
        }
        this.initializer = new Initializer(this.workers, configuration, this);
        this.collector = new Collector(configuration, this.workers);
        this.publisher = new Publisher(configuration, this.results);
        this.publisher.bind(Destination.ZK, new ZKHandler(this.zookeeper, this.groomServerName));
        setName(getClass().getSimpleName());
        setDaemon(true);
    }

    public void initialize() {
        this.initializer.start();
        this.collector.start();
        this.publisher.start();
    }

    @Override // org.apache.hama.monitor.MonitorListener
    public void notify(Result result) {
        try {
            this.results.put(result);
            LOG.debug(result.name() + " is put to queue (size is " + this.results.size() + ")");
        } catch (InterruptedException e) {
            LOG.warn(getClass().getSimpleName() + " is interrupted.", e);
            Thread.currentThread().interrupt();
        }
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        initialize();
    }
}
