/*
 * Decompiled with CFR 0.152.
 */
package backtype.storm.transactional;

import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.FailedException;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.transactional.ITransactionalSpout;
import backtype.storm.transactional.TransactionAttempt;
import backtype.storm.transactional.state.RotatingTransactionalState;
import backtype.storm.transactional.state.TransactionalState;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.math.BigInteger;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionalSpoutCoordinator
extends BaseRichSpout {
    public static final Logger LOG = LoggerFactory.getLogger(TransactionalSpoutCoordinator.class);
    public static final BigInteger INIT_TXID = BigInteger.ONE;
    public static final String TRANSACTION_BATCH_STREAM_ID = TransactionalSpoutCoordinator.class.getName() + "/batch";
    public static final String TRANSACTION_COMMIT_STREAM_ID = TransactionalSpoutCoordinator.class.getName() + "/commit";
    private static final String CURRENT_TX = "currtx";
    private static final String META_DIR = "meta";
    private ITransactionalSpout _spout;
    private ITransactionalSpout.Coordinator _coordinator;
    private TransactionalState _state;
    private RotatingTransactionalState _coordinatorState;
    TreeMap<BigInteger, TransactionStatus> _activeTx = new TreeMap();
    private SpoutOutputCollector _collector;
    private Random _rand;
    BigInteger _currTransaction;
    int _maxTransactionActive;
    StateInitializer _initializer;

    public TransactionalSpoutCoordinator(ITransactionalSpout spout) {
        this._spout = spout;
    }

    public ITransactionalSpout getSpout() {
        return this._spout;
    }

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this._rand = new Random(Utils.secureRandomLong());
        this._state = TransactionalState.newCoordinatorState(conf, (String)conf.get("topology.transactional.id"), this._spout.getComponentConfiguration());
        this._coordinatorState = new RotatingTransactionalState(this._state, META_DIR, true);
        this._collector = collector;
        this._coordinator = this._spout.getCoordinator(conf, context);
        this._currTransaction = this.getStoredCurrTransaction(this._state);
        Object active = conf.get("topology.max.spout.pending");
        this._maxTransactionActive = active == null ? 1 : Utils.getInt(active);
        this._initializer = new StateInitializer();
    }

    @Override
    public void close() {
        this._state.close();
    }

    @Override
    public void nextTuple() {
        this.sync();
    }

    @Override
    public void ack(Object msgId) {
        TransactionAttempt tx = (TransactionAttempt)msgId;
        TransactionStatus status = this._activeTx.get(tx.getTransactionId());
        if (status != null && tx.equals(status.attempt)) {
            if (status.status == AttemptStatus.PROCESSING) {
                status.status = AttemptStatus.PROCESSED;
            } else if (status.status == AttemptStatus.COMMITTING) {
                this._activeTx.remove(tx.getTransactionId());
                this._coordinatorState.cleanupBefore(tx.getTransactionId());
                this._currTransaction = this.nextTransactionId(tx.getTransactionId());
                this._state.setData(CURRENT_TX, this._currTransaction);
            }
            this.sync();
        }
    }

    @Override
    public void fail(Object msgId) {
        TransactionAttempt tx = (TransactionAttempt)msgId;
        TransactionStatus stored = this._activeTx.remove(tx.getTransactionId());
        if (stored != null && tx.equals(stored.attempt)) {
            this._activeTx.tailMap(tx.getTransactionId()).clear();
            this.sync();
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream(TRANSACTION_BATCH_STREAM_ID, new Fields("tx", "tx-meta", "committed-txid"));
        declarer.declareStream(TRANSACTION_COMMIT_STREAM_ID, new Fields("tx"));
    }

    private void sync() {
        TransactionStatus maybeCommit = this._activeTx.get(this._currTransaction);
        if (maybeCommit != null && maybeCommit.status == AttemptStatus.PROCESSED) {
            maybeCommit.status = AttemptStatus.COMMITTING;
            this._collector.emit(TRANSACTION_COMMIT_STREAM_ID, (List<Object>)new Values(maybeCommit.attempt), maybeCommit.attempt);
        }
        try {
            if (this._activeTx.size() < this._maxTransactionActive) {
                BigInteger curr = this._currTransaction;
                for (int i = 0; i < this._maxTransactionActive; ++i) {
                    if ((this._coordinatorState.hasCache(curr) || this._coordinator.isReady()) && !this._activeTx.containsKey(curr)) {
                        TransactionAttempt attempt = new TransactionAttempt(curr, this._rand.nextLong());
                        Object state = this._coordinatorState.getState(curr, this._initializer);
                        this._activeTx.put(curr, new TransactionStatus(attempt));
                        this._collector.emit(TRANSACTION_BATCH_STREAM_ID, (List<Object>)new Values(attempt, state, this.previousTransactionId(this._currTransaction)), attempt);
                    }
                    curr = this.nextTransactionId(curr);
                }
            }
        }
        catch (FailedException e) {
            LOG.warn("Failed to get metadata for a transaction", (Throwable)e);
        }
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        Config ret = new Config();
        ret.setMaxTaskParallelism(1);
        return ret;
    }

    private BigInteger nextTransactionId(BigInteger id) {
        return id.add(BigInteger.ONE);
    }

    private BigInteger previousTransactionId(BigInteger id) {
        if (id.equals(INIT_TXID)) {
            return null;
        }
        return id.subtract(BigInteger.ONE);
    }

    private BigInteger getStoredCurrTransaction(TransactionalState state) {
        BigInteger ret = (BigInteger)state.getData(CURRENT_TX);
        if (ret == null) {
            return INIT_TXID;
        }
        return ret;
    }

    private class StateInitializer
    implements RotatingTransactionalState.StateInitializer {
        private StateInitializer() {
        }

        @Override
        public Object init(BigInteger txid, Object lastState) {
            return TransactionalSpoutCoordinator.this._coordinator.initializeTransaction(txid, lastState);
        }
    }

    private static class TransactionStatus {
        TransactionAttempt attempt;
        AttemptStatus status;

        public TransactionStatus(TransactionAttempt attempt) {
            this.attempt = attempt;
            this.status = AttemptStatus.PROCESSING;
        }

        public String toString() {
            return this.attempt.toString() + " <" + this.status.toString() + ">";
        }
    }

    private static enum AttemptStatus {
        PROCESSING,
        PROCESSED,
        COMMITTING;

    }
}

