package org.elasticsearch.storm;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.hadoop.EsHadoopException;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.rest.InitializationUtils;
import org.elasticsearch.hadoop.rest.RestService;
import org.elasticsearch.storm.cfg.StormConfigurationOptions;
import org.elasticsearch.storm.cfg.StormSettings;
import org.elasticsearch.storm.serialization.StormTupleBytesConverter;
import org.elasticsearch.storm.serialization.StormTupleFieldExtractor;
import org.elasticsearch.storm.serialization.StormValueWriter;

/* loaded from: input_file:org/elasticsearch/storm/EsBolt.class */
public class EsBolt implements IRichBolt {
    private static transient Log log = LogFactory.getLog(EsBolt.class);
    private transient RestService.PartitionWriter writer;
    private transient OutputCollector collector;
    private Map boltConfig = new LinkedHashMap();
    private transient boolean flushOnTickTuple = true;
    private transient boolean ackWrites = false;
    private transient List<Tuple> inflightTuples = null;
    private transient int numberOfEntries = 0;

    public EsBolt(String str) {
        this.boltConfig.put(ConfigurationOptions.ES_RESOURCE_WRITE, str);
    }

    public EsBolt(String str, boolean z) {
        this.boltConfig.put(ConfigurationOptions.ES_RESOURCE_WRITE, str);
        this.boltConfig.put(StormConfigurationOptions.ES_STORM_BOLT_ACK, Boolean.toString(z));
    }

    public EsBolt(String str, Map map) {
        this.boltConfig.putAll(map);
        this.boltConfig.put(ConfigurationOptions.ES_RESOURCE_WRITE, str);
    }

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        LinkedHashMap linkedHashMap = new LinkedHashMap(map);
        linkedHashMap.putAll(this.boltConfig);
        StormSettings stormSettings = new StormSettings(linkedHashMap);
        this.flushOnTickTuple = stormSettings.getStormTickTupleFlush();
        this.ackWrites = stormSettings.getStormBoltAck();
        if (this.ackWrites) {
            stormSettings.setProperty(ConfigurationOptions.ES_BATCH_FLUSH_MANUAL, Boolean.TRUE.toString());
            this.numberOfEntries = stormSettings.getStormBulkSize();
            stormSettings.setProperty(ConfigurationOptions.ES_BATCH_SIZE_ENTRIES, String.valueOf(this.numberOfEntries));
            this.inflightTuples = new ArrayList(this.numberOfEntries + 1);
        }
        int size = topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size();
        InitializationUtils.setValueWriterIfNotSet(stormSettings, StormValueWriter.class, log);
        InitializationUtils.setBytesConverterIfNeeded(stormSettings, StormTupleBytesConverter.class, log);
        InitializationUtils.setFieldExtractorIfNotSet(stormSettings, StormTupleFieldExtractor.class, log);
        this.writer = RestService.createWriter(stormSettings, topologyContext.getThisTaskIndex(), size, log);
    }

    public void execute(Tuple tuple) {
        if (this.flushOnTickTuple && TupleUtils.isTickTuple(tuple)) {
            flush();
            return;
        }
        if (this.ackWrites) {
            this.inflightTuples.add(tuple);
        }
        try {
            this.writer.repository.writeToIndex(tuple);
            if (this.numberOfEntries > 0 && this.inflightTuples.size() >= this.numberOfEntries) {
                flush();
            }
            if (!this.ackWrites) {
                this.collector.ack(tuple);
            }
        } catch (RuntimeException e) {
            if (!this.ackWrites) {
                this.collector.fail(tuple);
            }
            throw e;
        }
    }

    private void flush() {
        if (this.ackWrites) {
            flushWithAck();
        } else {
            flushNoAck();
        }
    }

    private void flushWithAck() {
        try {
            BitSet tryFlush = this.writer.repository.tryFlush();
            this.writer.repository.discard();
            for (int i = 0; i < this.inflightTuples.size(); i++) {
                Tuple tuple = this.inflightTuples.get(i);
                if (tryFlush.get(i)) {
                    this.collector.fail(tuple);
                } else {
                    this.collector.ack(tuple);
                }
            }
            this.inflightTuples.clear();
        } catch (EsHadoopException e) {
            Iterator<Tuple> it = this.inflightTuples.iterator();
            while (it.hasNext()) {
                this.collector.fail(it.next());
            }
            this.inflightTuples.clear();
            throw e;
        }
    }

    private void flushNoAck() {
        this.writer.repository.flush();
    }

    public void cleanup() {
        if (this.writer != null) {
            try {
                flush();
            } finally {
                this.writer.close();
                this.writer = null;
            }
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }

    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}
