/*
 * Decompiled with CFR 0.152.
 */
package storm.trident.spout;

import backtype.storm.Config;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.trident.spout.ITridentSpout;
import storm.trident.topology.TransactionAttempt;
import storm.trident.topology.state.RotatingTransactionalState;
import storm.trident.topology.state.TransactionalState;

public class TridentSpoutCoordinator
implements IBasicBolt {
    public static final Logger LOG = LoggerFactory.getLogger(TridentSpoutCoordinator.class);
    private static final String META_DIR = "meta";
    ITridentSpout _spout;
    ITridentSpout.BatchCoordinator _coord;
    RotatingTransactionalState _state;
    TransactionalState _underlyingState;
    String _id;

    public TridentSpoutCoordinator(String id, ITridentSpout spout) {
        this._spout = spout;
        this._id = id;
    }

    @Override
    public void prepare(Map conf, TopologyContext context) {
        this._coord = this._spout.getCoordinator(this._id, conf, context);
        this._underlyingState = TransactionalState.newCoordinatorState(conf, this._id);
        this._state = new RotatingTransactionalState(this._underlyingState, META_DIR);
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        TransactionAttempt attempt = (TransactionAttempt)tuple.getValue(0);
        if (tuple.getSourceStreamId().equals("$success")) {
            this._state.cleanupBefore(attempt.getTransactionId());
            this._coord.success(attempt.getTransactionId());
        } else {
            long txid = attempt.getTransactionId();
            Object prevMeta = this._state.getPreviousState(txid);
            Object meta = this._coord.initializeTransaction(txid, prevMeta, this._state.getState(txid));
            this._state.overrideState(txid, meta);
            collector.emit("$batch", new Values(attempt, meta));
        }
    }

    @Override
    public void cleanup() {
        this._coord.close();
        this._underlyingState.close();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream("$batch", new Fields("tx", "metadata"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        Config ret = new Config();
        ret.setMaxTaskParallelism(1);
        return ret;
    }
}

