/*
 * Decompiled with CFR 0.152.
 */
package storm.trident.spout;

import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.FailedException;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.ICommitterTridentSpout;
import storm.trident.spout.ITridentSpout;
import storm.trident.topology.BatchInfo;
import storm.trident.topology.ITridentBatchBolt;
import storm.trident.topology.TransactionAttempt;
import storm.trident.tuple.ConsList;

public class TridentSpoutExecutor
implements ITridentBatchBolt {
    public static String ID_FIELD = "$tx";
    public static Logger LOG = LoggerFactory.getLogger(TridentSpoutExecutor.class);
    AddIdCollector _collector;
    ITridentSpout _spout;
    ITridentSpout.Emitter _emitter;
    String _streamName;
    String _txStateId;
    TreeMap<Long, TransactionAttempt> _activeBatches = new TreeMap();

    public TridentSpoutExecutor(String txStateId, String streamName, ITridentSpout spout) {
        this._txStateId = txStateId;
        this._spout = spout;
        this._streamName = streamName;
    }

    @Override
    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector) {
        this._emitter = this._spout.getEmitter(this._txStateId, conf, context);
        this._collector = new AddIdCollector(this._streamName, collector);
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public void execute(BatchInfo info, Tuple input) {
        TransactionAttempt attempt = (TransactionAttempt)input.getValue(0);
        if (input.getSourceStreamId().equals("$commit")) {
            if (!attempt.equals(this._activeBatches.get(attempt.getTransactionId()))) throw new FailedException("Received commit for different transaction attempt");
            ((ICommitterTridentSpout.Emitter)this._emitter).commit(attempt);
            this._activeBatches.remove(attempt.getTransactionId());
            return;
        } else if (input.getSourceStreamId().equals("$success")) {
            this._activeBatches.headMap(attempt.getTransactionId()).clear();
            this._emitter.success(attempt);
            return;
        } else {
            this._collector.setBatch(info.batchId);
            this._emitter.emitBatch(attempt, input.getValue(1), this._collector);
            this._activeBatches.put(attempt.getTransactionId(), attempt);
        }
    }

    @Override
    public void cleanup() {
        this._emitter.close();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        ArrayList<String> fields = new ArrayList<String>(this._spout.getOutputFields().toList());
        fields.add(0, ID_FIELD);
        declarer.declareStream(this._streamName, new Fields(fields));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return this._spout.getComponentConfiguration();
    }

    @Override
    public void finishBatch(BatchInfo batchInfo) {
    }

    @Override
    public Object initBatchState(String batchGroup, Object batchId) {
        return null;
    }

    private static class AddIdCollector
    implements TridentCollector {
        BatchOutputCollector _delegate;
        Object _id;
        String _stream;

        public AddIdCollector(String stream, BatchOutputCollector c) {
            this._delegate = c;
            this._stream = stream;
        }

        public void setBatch(Object id) {
            this._id = id;
        }

        @Override
        public void emit(List<Object> values) {
            this._delegate.emit(this._stream, new ConsList(this._id, values));
        }

        @Override
        public void reportError(Throwable t) {
            this._delegate.reportError(t);
        }
    }
}

