/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.kafka.spout.trident;

import com.google.common.annotations.VisibleForTesting;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
import org.apache.storm.kafka.spout.RecordTranslator;
import org.apache.storm.kafka.spout.TopicPartitionComparator;
import org.apache.storm.kafka.spout.internal.ConsumerFactory;
import org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault;
import org.apache.storm.kafka.spout.subscription.TopicAssigner;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutBatchMetadata;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutConfig;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutTopicPartition;
import org.apache.storm.kafka.spout.trident.TopicPartitionSerializer;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.topology.TransactionAttempt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaTridentSpoutEmitter<K, V>
implements Serializable {
    private static final long serialVersionUID = -7343927794834130435L;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class);
    private final Consumer<K, V> consumer;
    private final KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig;
    private final TopicAssigner topicAssigner;
    private final Map<TopicPartition, Long> tpToFirstSeekOffset = new HashMap<TopicPartition, Long>();
    private final long pollTimeoutMs;
    private final FirstPollOffsetStrategy firstPollOffsetStrategy;
    private final RecordTranslator<K, V> translator;
    private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
    private final TopologyContext topologyContext;

    public KafkaTridentSpoutEmitter(KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig, TopologyContext topologyContext) {
        this(kafkaSpoutConfig, topologyContext, new ConsumerFactoryDefault(), new TopicAssigner());
    }

    @VisibleForTesting
    KafkaTridentSpoutEmitter(KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig, TopologyContext topologyContext, ConsumerFactory<K, V> consumerFactory, TopicAssigner topicAssigner) {
        this.kafkaSpoutConfig = kafkaSpoutConfig;
        this.consumer = consumerFactory.createConsumer(kafkaSpoutConfig.getKafkaProps());
        this.topologyContext = topologyContext;
        this.translator = kafkaSpoutConfig.getTranslator();
        this.topicAssigner = topicAssigner;
        this.pollTimeoutMs = kafkaSpoutConfig.getPollTimeoutMs();
        this.firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
        LOG.debug("Created {}", (Object)this.toString());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void reEmitPartitionBatch(TransactionAttempt tx, TridentCollector collector, KafkaTridentSpoutTopicPartition currBatchPartition, Map<String, Object> currBatch) {
        TopicPartition currBatchTp = currBatchPartition.getTopicPartition();
        this.throwIfEmittingForUnassignedPartition(currBatchTp);
        KafkaTridentSpoutBatchMetadata currBatchMeta = KafkaTridentSpoutBatchMetadata.fromMap(currBatch);
        Collection<Object> pausedTopicPartitions = Collections.emptySet();
        if (!this.topologyContext.getStormId().equals(currBatchMeta.getTopologyId()) && this.isFirstPollOffsetStrategyIgnoringCommittedOffsets()) {
            LOG.debug("Skipping re-emit of batch that was originally emitted by another topology, because the current first poll offset strategy ignores committed offsets.");
            return;
        }
        LOG.debug("Re-emitting batch: [transaction= {}], [currBatchPartition = {}], [currBatchMetadata = {}], [collector = {}]", new Object[]{tx, currBatchPartition, currBatch, collector});
        try {
            pausedTopicPartitions = this.pauseTopicPartitions(currBatchTp);
            long seekOffset = currBatchMeta.getFirstOffset();
            if (seekOffset < 0L && currBatchMeta.getFirstOffset() == currBatchMeta.getLastOffset()) {
                LOG.debug("Skipping re-emit of batch with negative starting offset. The spout may set a negative starting offset for an empty batch that occurs at the start of a partition. It is not expected that Trident will replay such an empty batch, but this guard is here in case it tries to do so. See STORM-2990, STORM-3279 for context.");
                return;
            }
            LOG.debug("Seeking to offset [{}] for topic partition [{}]", (Object)seekOffset, (Object)currBatchTp);
            this.consumer.seek(currBatchTp, seekOffset);
            ConsumerRecords records = this.consumer.poll(this.pollTimeoutMs);
            LOG.debug("Polled [{}] records from Kafka.", (Object)records.count());
            for (ConsumerRecord record : records) {
                if (record.offset() == currBatchMeta.getLastOffset() + 1L) {
                    break;
                }
                if (record.offset() > currBatchMeta.getLastOffset()) {
                    throw new RuntimeException(String.format("Error when re-emitting batch. Overshot the end of the batch. The batch end offset was [{%d}], but received [{%d}]. Ensure log compaction is disabled in Kafka, since it is incompatible with non-opaque transactional spouts.", currBatchMeta.getLastOffset(), record.offset()));
                }
                this.emitTuple(collector, record);
            }
        }
        finally {
            this.consumer.resume(pausedTopicPartitions);
            LOG.trace("Resumed topic-partitions {}", pausedTopicPartitions);
        }
        LOG.debug("Re-emitted batch: [transaction = {}], [currBatchPartition = {}], [currBatchMetadata = {}], [collector = {}]", new Object[]{tx, currBatchPartition, currBatchMeta, collector});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Map<String, Object> emitPartitionBatchNew(TransactionAttempt tx, TridentCollector collector, KafkaTridentSpoutTopicPartition currBatchPartition, Map<String, Object> lastBatch) {
        KafkaTridentSpoutBatchMetadata lastBatchMeta;
        LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}]", new Object[]{tx, currBatchPartition, lastBatch, collector});
        TopicPartition currBatchTp = currBatchPartition.getTopicPartition();
        this.throwIfEmittingForUnassignedPartition(currBatchTp);
        KafkaTridentSpoutBatchMetadata currentBatch = lastBatchMeta = lastBatch == null ? null : KafkaTridentSpoutBatchMetadata.fromMap(lastBatch);
        Collection<Object> pausedTopicPartitions = Collections.emptySet();
        try {
            pausedTopicPartitions = this.pauseTopicPartitions(currBatchTp);
            this.seek(currBatchTp, lastBatchMeta);
            List records = this.consumer.poll(this.pollTimeoutMs).records(currBatchTp);
            LOG.debug("Polled [{}] records from Kafka.", (Object)records.size());
            if (!records.isEmpty()) {
                for (ConsumerRecord record : records) {
                    this.emitTuple(collector, record);
                }
                currentBatch = new KafkaTridentSpoutBatchMetadata(((ConsumerRecord)records.get(0)).offset(), ((ConsumerRecord)records.get(records.size() - 1)).offset(), this.topologyContext.getStormId());
            } else {
                long lastEmittedOffset = this.consumer.position(currBatchTp) - 1L;
                currentBatch = new KafkaTridentSpoutBatchMetadata(lastEmittedOffset, lastEmittedOffset, this.topologyContext.getStormId());
            }
        }
        finally {
            this.consumer.resume(pausedTopicPartitions);
            LOG.trace("Resumed topic-partitions {}", pausedTopicPartitions);
        }
        LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [currBatchMetadata = {}], [collector = {}]", new Object[]{tx, currBatchPartition, lastBatch, currentBatch, collector});
        return currentBatch.toMap();
    }

    private boolean isFirstPollOffsetStrategyIgnoringCommittedOffsets() {
        return this.firstPollOffsetStrategy == FirstPollOffsetStrategy.EARLIEST || this.firstPollOffsetStrategy == FirstPollOffsetStrategy.LATEST;
    }

    private void throwIfEmittingForUnassignedPartition(TopicPartition currBatchTp) {
        Set assignments = this.consumer.assignment();
        if (!assignments.contains(currBatchTp)) {
            throw new IllegalStateException("The spout is asked to emit tuples on a partition it is not assigned. This indicates a bug in the TopicFilter or ManualPartitioner implementations. The current partition is [" + currBatchTp + "], the assigned partitions are [" + assignments + "].");
        }
    }

    private void emitTuple(TridentCollector collector, ConsumerRecord<K, V> record) {
        List<Object> tuple = this.translator.apply(record);
        collector.emit(tuple);
        LOG.debug("Emitted tuple {} for record [{}]", tuple, record);
    }

    private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMeta) {
        if (this.isFirstPollSinceExecutorStarted(tp)) {
            boolean isFirstPollSinceTopologyWasDeployed;
            boolean bl = isFirstPollSinceTopologyWasDeployed = lastBatchMeta == null || !this.topologyContext.getStormId().equals(lastBatchMeta.getTopologyId());
            if (this.firstPollOffsetStrategy == FirstPollOffsetStrategy.EARLIEST && isFirstPollSinceTopologyWasDeployed) {
                LOG.debug("First poll for topic partition [{}], seeking to partition beginning", (Object)tp);
                this.consumer.seekToBeginning(Collections.singleton(tp));
            } else if (this.firstPollOffsetStrategy == FirstPollOffsetStrategy.LATEST && isFirstPollSinceTopologyWasDeployed) {
                LOG.debug("First poll for topic partition [{}], seeking to partition end", (Object)tp);
                this.consumer.seekToEnd(Collections.singleton(tp));
            } else if (lastBatchMeta != null) {
                LOG.debug("First poll for topic partition [{}], using last batch metadata", (Object)tp);
                this.consumer.seek(tp, lastBatchMeta.getLastOffset() + 1L);
            } else if (this.firstPollOffsetStrategy == FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST) {
                LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition beginning", (Object)tp);
                this.consumer.seekToBeginning(Collections.singleton(tp));
            } else if (this.firstPollOffsetStrategy == FirstPollOffsetStrategy.UNCOMMITTED_LATEST) {
                LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition end", (Object)tp);
                this.consumer.seekToEnd(Collections.singleton(tp));
            }
            this.tpToFirstSeekOffset.put(tp, this.consumer.position(tp));
        } else if (lastBatchMeta != null) {
            this.consumer.seek(tp, lastBatchMeta.getLastOffset() + 1L);
            LOG.debug("First poll for topic partition [{}], using last batch metadata", (Object)tp);
        } else {
            long initialFetchOffset = this.tpToFirstSeekOffset.get(tp);
            this.consumer.seek(tp, initialFetchOffset);
            LOG.debug("First poll for topic partition [{}], no last batch metadata present. Using stored initial fetch offset [{}]", (Object)tp, (Object)initialFetchOffset);
        }
        long fetchOffset = this.consumer.position(tp);
        LOG.debug("Set [fetchOffset = {}] for partition [{}]", (Object)fetchOffset, (Object)tp);
        return fetchOffset;
    }

    private boolean isFirstPollSinceExecutorStarted(TopicPartition tp) {
        return !this.tpToFirstSeekOffset.containsKey(tp);
    }

    private Collection<TopicPartition> pauseTopicPartitions(TopicPartition excludedTp) {
        HashSet<TopicPartition> pausedTopicPartitions = new HashSet<TopicPartition>(this.consumer.assignment());
        LOG.debug("Currently assigned topic-partitions {}", pausedTopicPartitions);
        pausedTopicPartitions.remove(excludedTp);
        this.consumer.pause(pausedTopicPartitions);
        LOG.debug("Paused topic-partitions {}", pausedTopicPartitions);
        return pausedTopicPartitions;
    }

    public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(List<Map<String, Object>> allPartitionInfo) {
        List<TopicPartition> sortedPartitions = allPartitionInfo.stream().map(map -> this.tpSerializer.fromMap((Map<String, Object>)map)).sorted(TopicPartitionComparator.INSTANCE).collect(Collectors.toList());
        List<KafkaTridentSpoutTopicPartition> allPartitions = this.newKafkaTridentSpoutTopicPartitions(sortedPartitions);
        LOG.debug("Returning all topic-partitions {} across all tasks. Current task index [{}]. Total tasks [{}] ", new Object[]{allPartitions, this.topologyContext.getThisTaskIndex(), this.getNumTasks()});
        return allPartitions;
    }

    public List<KafkaTridentSpoutTopicPartition> getPartitionsForTask(int taskId, int numTasks, List<KafkaTridentSpoutTopicPartition> allPartitionInfoSorted) {
        List<TopicPartition> tps = allPartitionInfoSorted.stream().map(kttp -> kttp.getTopicPartition()).collect(Collectors.toList());
        Set<TopicPartition> assignedTps = this.kafkaSpoutConfig.getTopicPartitioner().getPartitionsForThisTask(tps, this.topologyContext);
        LOG.debug("Consumer [{}], running on task with index [{}], has assigned topic-partitions {}", new Object[]{this.consumer, taskId, assignedTps});
        List<KafkaTridentSpoutTopicPartition> taskTps = this.newKafkaTridentSpoutTopicPartitions(assignedTps);
        return taskTps;
    }

    public void refreshPartitions(List<KafkaTridentSpoutTopicPartition> partitionResponsibilities) {
        Set<TopicPartition> assignedTps = partitionResponsibilities.stream().map(kttp -> kttp.getTopicPartition()).collect(Collectors.toSet());
        this.topicAssigner.assignPartitions(this.consumer, assignedTps, new KafkaSpoutConsumerRebalanceListener());
        LOG.debug("Assigned partitions [{}] to this task", assignedTps);
    }

    private List<KafkaTridentSpoutTopicPartition> newKafkaTridentSpoutTopicPartitions(Collection<TopicPartition> tps) {
        ArrayList<KafkaTridentSpoutTopicPartition> kttp = new ArrayList<KafkaTridentSpoutTopicPartition>(tps.size());
        for (TopicPartition tp : tps) {
            LOG.trace("Added topic-partition [{}]", (Object)tp);
            kttp.add(new KafkaTridentSpoutTopicPartition(tp));
        }
        return kttp;
    }

    private int getNumTasks() {
        return this.topologyContext.getComponentTasks(this.topologyContext.getThisComponentId()).size();
    }

    public void close() {
        this.consumer.close();
        LOG.debug("Closed");
    }

    public final String toString() {
        return super.toString() + "{kafkaSpoutConfig=" + this.kafkaSpoutConfig + '}';
    }

    private class KafkaSpoutConsumerRebalanceListener
    implements ConsumerRebalanceListener {
        private KafkaSpoutConsumerRebalanceListener() {
        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            LOG.info("Partitions revoked. [consumer={}, topic-partitions={}]", (Object)KafkaTridentSpoutEmitter.this.consumer, partitions);
        }

        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            LOG.info("Partitions reassignment. [consumer={}, topic-partitions={}]", (Object)KafkaTridentSpoutEmitter.this.consumer, partitions);
        }
    }
}

