package org.apache.hama.monitor;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hama.HamaConfiguration;

/* loaded from: input_file:org/apache/hama/monitor/Federator.class */
public final class Federator extends Thread {
    public static final Log LOG = LogFactory.getLog(Federator.class);
    private final BlockingQueue<Act> commands = new LinkedBlockingQueue();
    private final ExecutorService workers = Executors.newCachedThreadPool();

    /* loaded from: input_file:org/apache/hama/monitor/Federator$Act.class */
    public static final class Act {
        final Collector collector;
        final CollectorHandler handler;

        public Act(Collector collector, CollectorHandler collectorHandler) {
            this.collector = collector;
            this.handler = collectorHandler;
        }

        public final Collector collector() {
            return this.collector;
        }

        public final CollectorHandler handler() {
            return this.handler;
        }
    }

    /* loaded from: input_file:org/apache/hama/monitor/Federator$Collector.class */
    public interface Collector {
        Object harvest() throws Exception;
    }

    /* loaded from: input_file:org/apache/hama/monitor/Federator$CollectorHandler.class */
    public interface CollectorHandler {
        void handle(Future<Object> future);
    }

    /* loaded from: input_file:org/apache/hama/monitor/Federator$ServiceWorker.class */
    private static class ServiceWorker implements Callable<Object> {
        final Collector collector;

        public ServiceWorker(Collector collector) {
            this.collector = collector;
        }

        @Override // java.util.concurrent.Callable
        public Object call() throws Exception {
            return this.collector.harvest();
        }
    }

    public Federator(HamaConfiguration hamaConfiguration) {
        setName(Federator.class.getSimpleName());
        setDaemon(true);
    }

    public final void register(Act act) {
        if (null != act) {
            try {
                if (null != act.collector() && null != act.handler()) {
                    this.commands.put(act);
                    return;
                }
            } catch (InterruptedException e) {
                LOG.error(e);
                Thread.currentThread().interrupt();
                return;
            }
        }
        throw new NullPointerException("Collector or CollectorHandler  is not provided.");
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (!Thread.interrupted()) {
            try {
                Act take = this.commands.take();
                take.handler().handle(this.workers.submit(new ServiceWorker(take.collector())));
            } catch (InterruptedException e) {
                LOG.error(e);
                Thread.currentThread().interrupt();
                return;
            }
        }
    }
}
