package org.apache.storm.kafka;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import javax.servlet.jsp.tagext.TagAttributeInfo;
import kafka.api.OffsetRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.trident.MaxMetric;
import org.apache.storm.metric.api.CombinedMetric;
import org.apache.storm.metric.api.CountMetric;
import org.apache.storm.metric.api.MeanReducer;
import org.apache.storm.metric.api.ReducedMetric;
import org.apache.storm.spout.SpoutOutputCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/kafka/PartitionManager.class */
public class PartitionManager {
    public static final Logger LOG = LoggerFactory.getLogger(PartitionManager.class);
    private final CombinedMetric _fetchAPILatencyMax;
    private final ReducedMetric _fetchAPILatencyMean;
    private final CountMetric _fetchAPICallCount;
    private final CountMetric _fetchAPIMessageCount;
    private final CountMetric _lostMessageCount;
    Long _emittedToOffset;
    private final FailedMsgRetryManager _failedMsgRetryManager;
    Long _committedTo;
    Partition _partition;
    SpoutConfig _spoutConfig;
    String _topologyInstanceId;
    SimpleConsumer _consumer;
    DynamicPartitionConnections _connections;
    ZkState _state;
    Map _stormConf;
    private SortedMap<Long, Long> _pending = new TreeMap();
    LinkedList<MessageAndOffset> _waitingToEmit = new LinkedList<>();
    long numberFailed = 0;
    long numberAcked = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/storm/kafka/PartitionManager$KafkaMessageId.class */
    public static class KafkaMessageId {
        public Partition partition;
        public long offset;

        public KafkaMessageId(Partition partition, long j) {
            this.partition = partition;
            this.offset = j;
        }
    }

    /* loaded from: input_file:org/apache/storm/kafka/PartitionManager$OffsetData.class */
    public static class OffsetData {
        public long latestEmittedOffset;
        public long latestCompletedOffset;

        public OffsetData(long j, long j2) {
            this.latestEmittedOffset = j;
            this.latestCompletedOffset = j2;
        }
    }

    public PartitionManager(DynamicPartitionConnections dynamicPartitionConnections, String str, ZkState zkState, Map map, SpoutConfig spoutConfig, Partition partition) {
        this._partition = partition;
        this._connections = dynamicPartitionConnections;
        this._spoutConfig = spoutConfig;
        this._topologyInstanceId = str;
        this._consumer = dynamicPartitionConnections.register(partition.host, partition.topic, partition.partition);
        this._state = zkState;
        this._stormConf = map;
        this._failedMsgRetryManager = new ExponentialBackoffMsgRetryManager(this._spoutConfig.retryInitialDelayMs, this._spoutConfig.retryDelayMultiplier, this._spoutConfig.retryDelayMaxMs);
        String str2 = null;
        Long l = null;
        String committedPath = committedPath();
        try {
            Map<Object, Object> readJSON = this._state.readJSON(committedPath);
            LOG.info("Read partition information from: " + committedPath + "  --> " + readJSON);
            if (readJSON != null) {
                str2 = (String) ((Map) readJSON.get("topology")).get(TagAttributeInfo.ID);
                l = (Long) readJSON.get("offset");
            }
        } catch (Throwable th) {
            LOG.warn("Error reading and/or parsing at ZkNode: " + committedPath, th);
        }
        String str3 = this._partition.topic;
        Long valueOf = Long.valueOf(KafkaUtils.getOffset(this._consumer, str3, partition.partition, spoutConfig));
        if (str2 == null || l == null) {
            this._committedTo = valueOf;
            LOG.info("No partition information found, using configuration to determine offset");
        } else if (str.equals(str2) || !spoutConfig.ignoreZkOffsets) {
            this._committedTo = l;
            LOG.info("Read last commit offset from zookeeper: " + this._committedTo + "; old topology_id: " + str2 + " - new topology_id: " + str);
        } else {
            this._committedTo = Long.valueOf(KafkaUtils.getOffset(this._consumer, str3, partition.partition, spoutConfig.startOffsetTime));
            LOG.info("Topology change detected and ignore zookeeper offsets set to true, using configuration to determine offset");
        }
        if (valueOf.longValue() - this._committedTo.longValue() > spoutConfig.maxOffsetBehind || this._committedTo.longValue() <= 0) {
            LOG.info("Last commit offset from zookeeper: " + this._committedTo);
            Long l2 = this._committedTo;
            this._committedTo = valueOf;
            LOG.info("Commit offset " + l2 + " is more than " + spoutConfig.maxOffsetBehind + " behind latest offset " + valueOf + ", resetting to startOffsetTime=" + spoutConfig.startOffsetTime);
        }
        LOG.info("Starting Kafka " + this._consumer.host() + ":" + partition.partition + " from offset " + this._committedTo);
        this._emittedToOffset = this._committedTo;
        this._fetchAPILatencyMax = new CombinedMetric(new MaxMetric());
        this._fetchAPILatencyMean = new ReducedMetric(new MeanReducer());
        this._fetchAPICallCount = new CountMetric();
        this._fetchAPIMessageCount = new CountMetric();
        this._lostMessageCount = new CountMetric();
    }

    public Map getMetricsDataMap() {
        HashMap hashMap = new HashMap();
        hashMap.put(this._partition + "/fetchAPILatencyMax", this._fetchAPILatencyMax.getValueAndReset());
        hashMap.put(this._partition + "/fetchAPILatencyMean", this._fetchAPILatencyMean.getValueAndReset());
        hashMap.put(this._partition + "/fetchAPICallCount", this._fetchAPICallCount.getValueAndReset());
        hashMap.put(this._partition + "/fetchAPIMessageCount", this._fetchAPIMessageCount.getValueAndReset());
        hashMap.put(this._partition + "/lostMessageCount", this._lostMessageCount.getValueAndReset());
        return hashMap;
    }

    public KafkaSpout.EmitState next(SpoutOutputCollector spoutOutputCollector) {
        if (this._waitingToEmit.isEmpty()) {
            fill();
        }
        while (true) {
            MessageAndOffset pollFirst = this._waitingToEmit.pollFirst();
            if (pollFirst == null) {
                return KafkaSpout.EmitState.NO_EMITTED;
            }
            Iterable<List<Object>> generateTuples = this._spoutConfig.scheme instanceof MessageMetadataSchemeAsMultiScheme ? KafkaUtils.generateTuples(this._spoutConfig.scheme, pollFirst.message(), this._partition, pollFirst.offset()) : KafkaUtils.generateTuples(this._spoutConfig, pollFirst.message(), this._partition.topic);
            if (generateTuples != null && generateTuples.iterator().hasNext()) {
                if (Strings.isNullOrEmpty(this._spoutConfig.outputStreamId)) {
                    Iterator<List<Object>> it = generateTuples.iterator();
                    while (it.hasNext()) {
                        spoutOutputCollector.emit(it.next(), new KafkaMessageId(this._partition, pollFirst.offset()));
                    }
                } else {
                    Iterator<List<Object>> it2 = generateTuples.iterator();
                    while (it2.hasNext()) {
                        spoutOutputCollector.emit(this._spoutConfig.topic, it2.next(), new KafkaMessageId(this._partition, pollFirst.offset()));
                    }
                }
                return !this._waitingToEmit.isEmpty() ? KafkaSpout.EmitState.EMITTED_MORE_LEFT : KafkaSpout.EmitState.EMITTED_END;
            }
            ack(Long.valueOf(pollFirst.offset()));
        }
    }

    private void fill() {
        long nanoTime = System.nanoTime();
        Long nextFailedMessageToRetry = this._failedMsgRetryManager.nextFailedMessageToRetry();
        boolean z = nextFailedMessageToRetry == null;
        if (z) {
            nextFailedMessageToRetry = this._emittedToOffset;
        }
        try {
            ByteBufferMessageSet fetchMessages = KafkaUtils.fetchMessages(this._spoutConfig, this._consumer, this._partition, nextFailedMessageToRetry.longValue());
            long nanoTime2 = (System.nanoTime() - nanoTime) / 1000000;
            this._fetchAPILatencyMax.update(Long.valueOf(nanoTime2));
            this._fetchAPILatencyMean.update(Long.valueOf(nanoTime2));
            this._fetchAPICallCount.incr();
            if (fetchMessages != null) {
                int i = 0;
                Iterator<MessageAndOffset> it = fetchMessages.iterator();
                while (it.hasNext()) {
                    MessageAndOffset next = it.next();
                    Long valueOf = Long.valueOf(next.offset());
                    if (valueOf.longValue() >= nextFailedMessageToRetry.longValue() && (z || this._failedMsgRetryManager.shouldRetryMsg(valueOf))) {
                        i++;
                        if (!this._pending.containsKey(valueOf)) {
                            this._pending.put(valueOf, Long.valueOf(System.currentTimeMillis()));
                        }
                        this._waitingToEmit.add(next);
                        this._emittedToOffset = Long.valueOf(Math.max(next.nextOffset(), this._emittedToOffset.longValue()));
                        if (this._failedMsgRetryManager.shouldRetryMsg(valueOf)) {
                            this._failedMsgRetryManager.retryStarted(valueOf);
                        }
                    }
                }
                this._fetchAPIMessageCount.incrBy(i);
            }
        } catch (TopicOffsetOutOfRangeException e) {
            Long valueOf2 = Long.valueOf(KafkaUtils.getOffset(this._consumer, this._partition.topic, this._partition.partition, OffsetRequest.EarliestTime()));
            if (!z) {
                Set<Long> clearInvalidMessages = this._failedMsgRetryManager.clearInvalidMessages(valueOf2);
                if (null != clearInvalidMessages) {
                    this._lostMessageCount.incrBy(clearInvalidMessages.size());
                }
                LOG.warn("Removing the failed offsets that are out of range: {}", clearInvalidMessages);
            }
            if (valueOf2.longValue() > this._emittedToOffset.longValue()) {
                this._lostMessageCount.incrBy(valueOf2.longValue() - this._emittedToOffset.longValue());
                this._emittedToOffset = valueOf2;
                LOG.warn("{} Using new offset: {}", Integer.valueOf(this._partition.partition), this._emittedToOffset);
            }
        }
    }

    public void ack(Long l) {
        if (!this._pending.isEmpty() && this._pending.firstKey().longValue() < l.longValue() - this._spoutConfig.maxOffsetBehind) {
            this._pending.headMap(Long.valueOf(l.longValue() - this._spoutConfig.maxOffsetBehind)).clear();
        }
        this._pending.remove(l);
        this._failedMsgRetryManager.acked(l);
        this.numberAcked++;
    }

    public void fail(Long l) {
        if (l.longValue() < this._emittedToOffset.longValue() - this._spoutConfig.maxOffsetBehind) {
            LOG.info("Skipping failed tuple at offset=" + l + " because it's more than maxOffsetBehind=" + this._spoutConfig.maxOffsetBehind + " behind _emittedToOffset=" + this._emittedToOffset);
            return;
        }
        LOG.debug("failing at offset={} with _pending.size()={} pending and _emittedToOffset={}", l, Integer.valueOf(this._pending.size()), this._emittedToOffset);
        this.numberFailed++;
        if (this.numberAcked == 0 && this.numberFailed > this._spoutConfig.maxOffsetBehind) {
            throw new RuntimeException("Too many tuple failures");
        }
        this._failedMsgRetryManager.failed(l);
    }

    public void commit() {
        long lastCompletedOffset = lastCompletedOffset();
        if (this._committedTo.longValue() == lastCompletedOffset) {
            LOG.debug("No new offset for {} for topology: {}", this._partition, this._topologyInstanceId);
            return;
        }
        LOG.debug("Writing last completed offset ({}) to ZK for {} for topology: {}", Long.valueOf(lastCompletedOffset), this._partition, this._topologyInstanceId);
        this._state.writeJSON(committedPath(), ImmutableMap.builder().put("topology", ImmutableMap.of(TagAttributeInfo.ID, (Object) this._topologyInstanceId, "name", this._stormConf.get("topology.name"))).put("offset", Long.valueOf(lastCompletedOffset)).put(StringMessageAndMetadataScheme.STRING_SCHEME_PARTITION_KEY, Integer.valueOf(this._partition.partition)).put("broker", ImmutableMap.of("host", (Integer) this._partition.host.host, "port", Integer.valueOf(this._partition.host.port))).put("topic", this._partition.topic).build());
        this._committedTo = Long.valueOf(lastCompletedOffset);
        LOG.debug("Wrote last completed offset ({}) to ZK for {} for topology: {}", Long.valueOf(lastCompletedOffset), this._partition, this._topologyInstanceId);
    }

    private String committedPath() {
        return this._spoutConfig.zkRoot + "/" + this._spoutConfig.id + "/" + this._partition.getId();
    }

    public long lastCompletedOffset() {
        return this._pending.isEmpty() ? this._emittedToOffset.longValue() : this._pending.firstKey().longValue();
    }

    public OffsetData getOffsetData() {
        return new OffsetData(this._emittedToOffset.longValue(), lastCompletedOffset());
    }

    public Partition getPartition() {
        return this._partition;
    }

    public void close() {
        commit();
        this._connections.unregister(this._partition.host, this._partition.topic, this._partition.partition);
    }
}
