package org.apache.hama.metrics;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hama.HamaConfiguration;

/* loaded from: input_file:org/apache/hama/metrics/MetricsSinkAdaptor.class */
class MetricsSinkAdaptor {
    public static final Log LOG = LogFactory.getLog(MetricsSinkAdaptor.class);
    private final String name;
    private final String description;
    private final BlockingQueue<List<Pair>> queue;
    private final HamaConfiguration conf;
    private final int MAX_QUEUE_SIZE;
    private final Worker worker;
    private final MetricsFilter sourceFilter;
    private final MetricsFilter recordFilter;
    private final MetricsFilter metricFilter;

    /* loaded from: input_file:org/apache/hama/metrics/MetricsSinkAdaptor$Worker.class */
    private class Worker implements Callable {
        private final BlockingQueue<List<Pair>> queue;
        private final MetricsSink sink;
        private final AtomicBoolean state = new AtomicBoolean(false);
        private final ScheduledExecutorService sched = Executors.newScheduledThreadPool(1);

        Worker(BlockingQueue<List<Pair>> blockingQueue, MetricsSink metricsSink) {
            this.queue = blockingQueue;
            this.sink = metricsSink;
        }

        void start() {
            this.state.set(true);
            this.sched.schedule(this, 0L, TimeUnit.SECONDS);
        }

        MetricsSink sink() {
            return this.sink;
        }

        @Override // java.util.concurrent.Callable
        public Object call() throws Exception {
            while (this.state.get()) {
                List<Pair> take = this.queue.take();
                if (MetricsSinkAdaptor.LOG.isDebugEnabled()) {
                    MetricsSinkAdaptor.LOG.debug(" Pairs (in queue) size " + (null != take ? Integer.valueOf(take.size()) : "0"));
                }
                for (Pair pair : take) {
                    if (MetricsSinkAdaptor.LOG.isDebugEnabled()) {
                        MetricsSinkAdaptor.LOG.debug(" Pair's name " + pair.name() + " record size " + (null != pair.records() ? Integer.valueOf(pair.records().size()) : "0"));
                    }
                    if (MetricsSinkAdaptor.this.sourceFilter == null || MetricsSinkAdaptor.this.sourceFilter.accepts(pair.name())) {
                        for (MetricsRecord metricsRecord : pair.records()) {
                            if (MetricsSinkAdaptor.this.recordFilter == null || MetricsSinkAdaptor.this.recordFilter.accepts(metricsRecord)) {
                                this.sink.putMetrics(metricsRecord);
                            }
                        }
                    }
                }
            }
            return null;
        }

        void stop() {
            this.state.set(false);
            this.sched.shutdown();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MetricsSinkAdaptor(String str, String str2, MetricsSink metricsSink, BlockingQueue<List<Pair>> blockingQueue, HamaConfiguration hamaConfiguration, MetricsFilter metricsFilter, MetricsFilter metricsFilter2, MetricsFilter metricsFilter3) {
        this.name = str;
        this.description = str2;
        this.queue = blockingQueue;
        this.conf = hamaConfiguration;
        this.MAX_QUEUE_SIZE = hamaConfiguration.getInt("bsp.metrics.max_queue_size", 100);
        this.worker = new Worker(blockingQueue, metricsSink);
        this.sourceFilter = metricsFilter;
        this.recordFilter = metricsFilter2;
        this.metricFilter = metricsFilter3;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void putMetrics(List<Pair> list) {
        try {
            if (this.queue.size() == this.MAX_QUEUE_SIZE) {
                this.queue.take();
            }
            this.queue.put(list);
        } catch (Exception e) {
            LOG.error("Fail to add data to designated queue.", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        this.worker.start();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        this.worker.stop();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final String name() {
        return this.name;
    }

    final String description() {
        return this.description;
    }

    public MetricsSink sink() {
        return this.worker.sink();
    }
}
