/*
 * Decompiled with CFR 0.152.
 */
package backtype.storm.testing;

import backtype.storm.spout.ISpoutOutputCollector;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.utils.RegisteredGlobalState;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

public class SpoutTracker
extends BaseRichSpout {
    IRichSpout _delegate;
    SpoutTrackOutputCollector _tracker;
    String _trackId;

    public SpoutTracker(IRichSpout delegate, String trackId) {
        this._delegate = delegate;
        this._trackId = trackId;
    }

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this._tracker = new SpoutTrackOutputCollector(collector);
        this._delegate.open(conf, context, new SpoutOutputCollector(this._tracker));
    }

    @Override
    public void close() {
        this._delegate.close();
    }

    @Override
    public void nextTuple() {
        this._delegate.nextTuple();
    }

    @Override
    public void ack(Object msgId) {
        this._delegate.ack(msgId);
        Map stats = (Map)RegisteredGlobalState.getState(this._trackId);
        ((AtomicInteger)stats.get("processed")).incrementAndGet();
    }

    @Override
    public void fail(Object msgId) {
        this._delegate.fail(msgId);
        Map stats = (Map)RegisteredGlobalState.getState(this._trackId);
        ((AtomicInteger)stats.get("processed")).incrementAndGet();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        this._delegate.declareOutputFields(declarer);
    }

    private class SpoutTrackOutputCollector
    implements ISpoutOutputCollector {
        public int transferred = 0;
        public int emitted = 0;
        public SpoutOutputCollector _collector;

        public SpoutTrackOutputCollector(SpoutOutputCollector collector) {
            this._collector = collector;
        }

        private void recordSpoutEmit() {
            Map stats = (Map)RegisteredGlobalState.getState(SpoutTracker.this._trackId);
            ((AtomicInteger)stats.get("spout-emitted")).incrementAndGet();
        }

        @Override
        public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
            List<Integer> ret = this._collector.emit(streamId, tuple, messageId);
            this.recordSpoutEmit();
            return ret;
        }

        @Override
        public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
            this._collector.emitDirect(taskId, streamId, tuple, messageId);
            this.recordSpoutEmit();
        }

        @Override
        public void reportError(Throwable error) {
            this._collector.reportError(error);
        }
    }
}

