/*
 * Decompiled with CFR 0.152.
 */
package storm.kafka;

import backtype.storm.metric.api.IMetric;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import kafka.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.DynamicPartitionConnections;
import storm.kafka.FailedFetchException;
import storm.kafka.KafkaUtils;
import storm.kafka.Partition;
import storm.kafka.PartitionCoordinator;
import storm.kafka.PartitionManager;
import storm.kafka.SpoutConfig;
import storm.kafka.StaticCoordinator;
import storm.kafka.StaticHosts;
import storm.kafka.ZkCoordinator;
import storm.kafka.ZkState;

public class KafkaSpout
extends BaseRichSpout {
    public static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
    String _uuid = UUID.randomUUID().toString();
    SpoutConfig _spoutConfig;
    SpoutOutputCollector _collector;
    PartitionCoordinator _coordinator;
    DynamicPartitionConnections _connections;
    ZkState _state;
    long _lastUpdateMs = 0L;
    int _currPartitionIndex = 0;

    public KafkaSpout(SpoutConfig spoutConf) {
        this._spoutConfig = spoutConf;
    }

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        Integer zkPort;
        this._collector = collector;
        HashMap<String, Object> stateConf = new HashMap<String, Object>(conf);
        List zkServers = this._spoutConfig.zkServers;
        if (zkServers == null) {
            zkServers = (List)conf.get("storm.zookeeper.servers");
        }
        if ((zkPort = this._spoutConfig.zkPort) == null) {
            zkPort = ((Number)conf.get("storm.zookeeper.port")).intValue();
        }
        stateConf.put("transactional.zookeeper.servers", zkServers);
        stateConf.put("transactional.zookeeper.port", zkPort);
        stateConf.put("transactional.zookeeper.root", this._spoutConfig.zkRoot);
        this._state = new ZkState(stateConf);
        this._connections = new DynamicPartitionConnections(this._spoutConfig, KafkaUtils.makeBrokerReader(conf, this._spoutConfig));
        int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
        this._coordinator = this._spoutConfig.hosts instanceof StaticHosts ? new StaticCoordinator(this._connections, conf, this._spoutConfig, this._state, context.getThisTaskIndex(), totalTasks, this._uuid) : new ZkCoordinator(this._connections, conf, this._spoutConfig, this._state, context.getThisTaskIndex(), totalTasks, this._uuid);
        context.registerMetric("kafkaOffset", new IMetric(){
            KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric;
            {
                this._kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(KafkaSpout.this._spoutConfig.topic, KafkaSpout.this._connections);
            }

            public Object getValueAndReset() {
                List<PartitionManager> pms = KafkaSpout.this._coordinator.getMyManagedPartitions();
                HashSet<Partition> latestPartitions = new HashSet<Partition>();
                for (PartitionManager pm : pms) {
                    latestPartitions.add(pm.getPartition());
                }
                this._kafkaOffsetMetric.refreshPartitions(latestPartitions);
                for (PartitionManager pm : pms) {
                    this._kafkaOffsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset());
                }
                return this._kafkaOffsetMetric.getValueAndReset();
            }
        }, this._spoutConfig.metricsTimeBucketSizeInSecs);
        context.registerMetric("kafkaPartition", new IMetric(){

            public Object getValueAndReset() {
                List<PartitionManager> pms = KafkaSpout.this._coordinator.getMyManagedPartitions();
                HashMap concatMetricsDataMaps = new HashMap();
                for (PartitionManager pm : pms) {
                    concatMetricsDataMaps.putAll(pm.getMetricsDataMap());
                }
                return concatMetricsDataMaps;
            }
        }, this._spoutConfig.metricsTimeBucketSizeInSecs);
    }

    public void close() {
        this._state.close();
    }

    public void nextTuple() {
        long now;
        List<PartitionManager> managers = this._coordinator.getMyManagedPartitions();
        for (int i = 0; i < managers.size(); ++i) {
            try {
                this._currPartitionIndex %= managers.size();
                EmitState state = managers.get(this._currPartitionIndex).next(this._collector);
                if (state != EmitState.EMITTED_MORE_LEFT) {
                    this._currPartitionIndex = (this._currPartitionIndex + 1) % managers.size();
                }
                if (state == EmitState.NO_EMITTED) continue;
                break;
            }
            catch (FailedFetchException e) {
                LOG.warn("Fetch failed", (Throwable)e);
                this._coordinator.refresh();
            }
        }
        if ((now = System.currentTimeMillis()) - this._lastUpdateMs > this._spoutConfig.stateUpdateIntervalMs) {
            this.commit();
        }
    }

    public void ack(Object msgId) {
        PartitionManager.KafkaMessageId id = (PartitionManager.KafkaMessageId)msgId;
        PartitionManager m = this._coordinator.getManager(id.partition);
        if (m != null) {
            m.ack(id.offset);
        }
    }

    public void fail(Object msgId) {
        PartitionManager.KafkaMessageId id = (PartitionManager.KafkaMessageId)msgId;
        PartitionManager m = this._coordinator.getManager(id.partition);
        if (m != null) {
            m.fail(id.offset);
        }
    }

    public void deactivate() {
        this.commit();
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(this._spoutConfig.scheme.getOutputFields());
    }

    private void commit() {
        this._lastUpdateMs = System.currentTimeMillis();
        for (PartitionManager manager : this._coordinator.getMyManagedPartitions()) {
            manager.commit();
        }
    }

    static enum EmitState {
        EMITTED_MORE_LEFT,
        EMITTED_END,
        NO_EMITTED;

    }

    public static class MessageAndRealOffset {
        public Message msg;
        public long offset;

        public MessageAndRealOffset(Message msg, long offset) {
            this.msg = msg;
            this.offset = offset;
        }
    }
}

