package org.graylog2.outputs;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.inject.Inject;
import org.graylog2.Configuration;
import org.graylog2.indexer.cluster.Cluster;
import org.graylog2.indexer.messages.Messages;
import org.graylog2.plugin.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog2/outputs/BatchedElasticSearchOutput.class */
public class BatchedElasticSearchOutput extends ElasticSearchOutput {
    private static final Logger LOG = LoggerFactory.getLogger(BatchedElasticSearchOutput.class);
    private final List<Message> buffer;
    private final int maxBufferSize;
    private final ExecutorService flushThread;
    private final Timer processTime;
    private final Histogram batchSize;
    private final Meter bufferFlushes;
    private final Meter bufferFlushesRequested;
    private final Cluster cluster;

    @Inject
    public BatchedElasticSearchOutput(MetricRegistry metricRegistry, Messages messages, Cluster cluster, Configuration configuration) {
        super(metricRegistry, messages);
        this.flushThread = Executors.newSingleThreadExecutor();
        this.cluster = cluster;
        this.maxBufferSize = configuration.getOutputBatchSize();
        this.buffer = Lists.newArrayListWithCapacity(this.maxBufferSize);
        this.processTime = metricRegistry.timer(MetricRegistry.name(getClass(), "processTime"));
        this.batchSize = metricRegistry.histogram(MetricRegistry.name(getClass(), "batchSize"));
        this.bufferFlushes = metricRegistry.meter(MetricRegistry.name(getClass(), "bufferFlushes"));
        this.bufferFlushesRequested = metricRegistry.meter(MetricRegistry.name(getClass(), "bufferFlushesRequested"));
    }

    @Override // org.graylog2.outputs.ElasticSearchOutput, org.graylog2.plugin.outputs.MessageOutput
    public void write(Message message) throws Exception {
        synchronized (this.buffer) {
            this.buffer.add(message);
            if (LOG.isTraceEnabled()) {
                LOG.trace("Buffering message id to [{}]: <{}>", getName(), message.getId());
            }
            if (this.buffer.size() >= this.maxBufferSize) {
                flush();
            }
        }
    }

    @Override // org.graylog2.outputs.ElasticSearchOutput, org.graylog2.plugin.outputs.MessageOutput
    public String getHumanName() {
        return "Elasticsearch Output with Batching";
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void synchronousFlush(List<Message> list) {
        LOG.debug("[{}] Starting flushing {} messages", Thread.currentThread(), Integer.valueOf(list.size()));
        try {
            Timer.Context time = this.processTime.time();
            Throwable th = null;
            try {
                try {
                    write(list);
                    this.batchSize.update(list.size());
                    this.bufferFlushes.mark();
                    if (time != null) {
                        if (0 != 0) {
                            try {
                                time.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            time.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            LOG.error("Unable to flush message buffer", (Throwable) e);
        }
        LOG.debug("[{}] Flushing {} messages completed", Thread.currentThread(), Integer.valueOf(list.size()));
    }

    private void asynchronousFlush(final List<Message> list) {
        LOG.debug("Submitting new flush thread");
        this.flushThread.submit(new Runnable() { // from class: org.graylog2.outputs.BatchedElasticSearchOutput.1
            @Override // java.lang.Runnable
            public void run() {
                BatchedElasticSearchOutput.this.synchronousFlush(list);
            }
        });
    }

    public void flush() {
        flush(true);
    }

    @VisibleForTesting
    void flush(boolean z) {
        ImmutableList copyOf;
        this.bufferFlushesRequested.mark();
        if (this.buffer.isEmpty()) {
            LOG.debug("Not flushing empty buffer");
            return;
        }
        if (!this.cluster.isConnectedAndHealthy()) {
            LOG.warn("Clearing buffer ({} messages) because the Elasticsearch cluster is down.", Integer.valueOf(this.buffer.size()));
            this.buffer.clear();
            return;
        }
        synchronized (this.buffer) {
            copyOf = ImmutableList.copyOf((Collection) this.buffer);
            this.buffer.clear();
        }
        if (z) {
            asynchronousFlush(copyOf);
        } else {
            synchronousFlush(copyOf);
        }
    }
}
