/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.kafka.spout.trident;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.RecordTranslator;
import org.apache.storm.kafka.spout.internal.Timer;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutBatchMetadata;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutManager;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutTopicPartition;
import org.apache.storm.kafka.spout.trident.TopicPartitionSerializer;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
import org.apache.storm.trident.topology.TransactionAttempt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaTridentSpoutEmitter<K, V>
implements IOpaquePartitionedTridentSpout.Emitter<List<Map<String, Object>>, KafkaTridentSpoutTopicPartition, Map<String, Object>>,
Serializable {
    private static final long serialVersionUID = -7343927794834130435L;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class);
    private final KafkaConsumer<K, V> kafkaConsumer;
    private final KafkaTridentSpoutManager<K, V> kafkaManager;
    private final Map<TopicPartition, Long> firstPollTransaction = new HashMap<TopicPartition, Long>();
    private final long pollTimeoutMs;
    private final KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy;
    private final RecordTranslator<K, V> translator;
    private final Timer refreshSubscriptionTimer;
    private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
    private TopologyContext topologyContext;

    public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaManager, TopologyContext topologyContext, Timer refreshSubscriptionTimer) {
        this.kafkaConsumer = kafkaManager.createAndSubscribeKafkaConsumer(topologyContext);
        this.kafkaManager = kafkaManager;
        this.topologyContext = topologyContext;
        this.refreshSubscriptionTimer = refreshSubscriptionTimer;
        this.translator = kafkaManager.getKafkaSpoutConfig().getTranslator();
        KafkaSpoutConfig<K, V> kafkaSpoutConfig = kafkaManager.getKafkaSpoutConfig();
        this.pollTimeoutMs = kafkaSpoutConfig.getPollTimeoutMs();
        this.firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
        LOG.debug("Created {}", (Object)this.toString());
    }

    public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaManager, TopologyContext topologyContext) {
        this(kafkaManager, topologyContext, new Timer(500L, kafkaManager.getKafkaSpoutConfig().getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Map<String, Object> emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, KafkaTridentSpoutTopicPartition currBatchPartition, Map<String, Object> lastBatch) {
        KafkaTridentSpoutBatchMetadata lastBatchMeta;
        LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}]", new Object[]{tx, currBatchPartition, lastBatch, collector});
        TopicPartition currBatchTp = currBatchPartition.getTopicPartition();
        Set assignments = this.kafkaConsumer.assignment();
        KafkaTridentSpoutBatchMetadata currentBatch = lastBatchMeta = lastBatch == null ? null : KafkaTridentSpoutBatchMetadata.fromMap(lastBatch);
        Collection<Object> pausedTopicPartitions = Collections.emptySet();
        if (assignments == null || !assignments.contains(currBatchPartition.getTopicPartition())) {
            LOG.warn("SKIPPING processing batch [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}] because it is not part of the assignments {} of consumer instance [{}] of consumer group [{}]", new Object[]{tx, currBatchPartition, lastBatch, collector, assignments, this.kafkaConsumer, this.kafkaManager.getKafkaSpoutConfig().getConsumerGroupId()});
        } else {
            try {
                pausedTopicPartitions = this.pauseTopicPartitions(currBatchTp);
                this.seek(currBatchTp, lastBatchMeta, tx.getTransactionId());
                if (this.refreshSubscriptionTimer.isExpiredResetOnTrue()) {
                    this.kafkaManager.getKafkaSpoutConfig().getSubscription().refreshAssignment();
                }
                ConsumerRecords records = this.kafkaConsumer.poll(this.pollTimeoutMs);
                LOG.debug("Polled [{}] records from Kafka.", (Object)records.count());
                if (!records.isEmpty()) {
                    this.emitTuples(collector, records);
                    currentBatch = new KafkaTridentSpoutBatchMetadata(currBatchTp, records);
                }
            }
            finally {
                this.kafkaConsumer.resume(pausedTopicPartitions);
                LOG.trace("Resumed topic-partitions {}", pausedTopicPartitions);
            }
            LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [currBatchMetadata = {}], [collector = {}]", new Object[]{tx, currBatchPartition, lastBatch, currentBatch, collector});
        }
        return currentBatch == null ? null : currentBatch.toMap();
    }

    private void emitTuples(TridentCollector collector, ConsumerRecords<K, V> records) {
        for (ConsumerRecord record : records) {
            List<Object> tuple = this.translator.apply(record);
            collector.emit(tuple);
            LOG.debug("Emitted tuple {} for record [{}]", tuple, (Object)record);
        }
    }

    private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMeta, long transactionId) {
        if (this.isFirstPoll(tp, transactionId)) {
            if (this.firstPollOffsetStrategy == KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST) {
                LOG.debug("First poll for topic partition [{}], seeking to partition beginning", (Object)tp);
                this.kafkaConsumer.seekToBeginning(Collections.singleton(tp));
            } else if (this.firstPollOffsetStrategy == KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) {
                LOG.debug("First poll for topic partition [{}], seeking to partition end", (Object)tp);
                this.kafkaConsumer.seekToEnd(Collections.singleton(tp));
            } else if (lastBatchMeta != null) {
                LOG.debug("First poll for topic partition [{}], using last batch metadata", (Object)tp);
                this.kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1L);
            } else if (this.firstPollOffsetStrategy == KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST) {
                LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition beginning", (Object)tp);
                this.kafkaConsumer.seekToBeginning(Collections.singleton(tp));
            } else if (this.firstPollOffsetStrategy == KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST) {
                LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition end", (Object)tp);
                this.kafkaConsumer.seekToEnd(Collections.singleton(tp));
            }
            this.firstPollTransaction.put(tp, transactionId);
        } else {
            this.kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1L);
            LOG.debug("First poll for topic partition [{}], using last batch metadata", (Object)tp);
        }
        long fetchOffset = this.kafkaConsumer.position(tp);
        LOG.debug("Set [fetchOffset = {}]", (Object)fetchOffset);
        return fetchOffset;
    }

    private boolean isFirstPoll(TopicPartition tp, long txid) {
        return !this.firstPollTransaction.containsKey(tp) || this.firstPollTransaction.get(tp) == txid;
    }

    private Collection<TopicPartition> pauseTopicPartitions(TopicPartition excludedTp) {
        HashSet<TopicPartition> pausedTopicPartitions = new HashSet<TopicPartition>(this.kafkaConsumer.assignment());
        LOG.debug("Currently assigned topic-partitions {}", pausedTopicPartitions);
        pausedTopicPartitions.remove(excludedTp);
        this.kafkaConsumer.pause(pausedTopicPartitions);
        LOG.debug("Paused topic-partitions {}", pausedTopicPartitions);
        return pausedTopicPartitions;
    }

    public void refreshPartitions(List<KafkaTridentSpoutTopicPartition> partitionResponsibilities) {
        LOG.trace("Refreshing of topic-partitions handled by Kafka. No action taken by this method for topic partitions {}", partitionResponsibilities);
    }

    public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(List<Map<String, Object>> allPartitionInfo) {
        ArrayList<TopicPartition> allTopicPartitions = new ArrayList<TopicPartition>();
        for (Map<String, Object> map : allPartitionInfo) {
            allTopicPartitions.add(this.tpSerializer.fromMap(map));
        }
        List<KafkaTridentSpoutTopicPartition> allPartitions = this.newKafkaTridentSpoutTopicPartitions(allTopicPartitions);
        LOG.debug("Returning all topic-partitions {} across all tasks. Current task index [{}]. Total tasks [{}] ", new Object[]{allPartitions, this.topologyContext.getThisTaskIndex(), this.getNumTasks()});
        return allPartitions;
    }

    public List<KafkaTridentSpoutTopicPartition> getPartitionsForTask(int taskId, int numTasks, List<Map<String, Object>> allPartitionInfo) {
        Set assignedTps = this.kafkaConsumer.assignment();
        LOG.debug("Consumer [{}], running on task with index [{}], has assigned topic-partitions {}", new Object[]{this.kafkaConsumer, taskId, assignedTps});
        List<KafkaTridentSpoutTopicPartition> taskTps = this.newKafkaTridentSpoutTopicPartitions(assignedTps);
        LOG.debug("Returning topic-partitions {} for task with index [{}]", taskTps, (Object)taskId);
        return taskTps;
    }

    private List<KafkaTridentSpoutTopicPartition> newKafkaTridentSpoutTopicPartitions(Collection<TopicPartition> tps) {
        ArrayList<KafkaTridentSpoutTopicPartition> kttp = new ArrayList<KafkaTridentSpoutTopicPartition>(tps == null ? 0 : tps.size());
        if (tps != null) {
            for (TopicPartition tp : tps) {
                LOG.trace("Added topic-partition [{}]", (Object)tp);
                kttp.add(new KafkaTridentSpoutTopicPartition(tp));
            }
        }
        return kttp;
    }

    private int getNumTasks() {
        return this.topologyContext.getComponentTasks(this.topologyContext.getThisComponentId()).size();
    }

    public void close() {
        this.kafkaConsumer.close();
        LOG.debug("Closed");
    }

    public final String toString() {
        return super.toString() + "{kafkaManager=" + this.kafkaManager + '}';
    }
}

