package backtype.storm.transactional;

import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.FailedException;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.transactional.ITransactionalSpout;
import backtype.storm.transactional.state.RotatingTransactionalState;
import backtype.storm.transactional.state.TransactionalState;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.batch.util.BatchDef;
import java.math.BigInteger;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:backtype/storm/transactional/TransactionalSpoutCoordinator.class */
public class TransactionalSpoutCoordinator extends BaseRichSpout {
    public static final Logger LOG = LoggerFactory.getLogger(TransactionalSpoutCoordinator.class);
    public static final BigInteger INIT_TXID = BigInteger.ONE;
    public static final String TRANSACTION_BATCH_STREAM_ID = TransactionalSpoutCoordinator.class.getName() + "/batch";
    public static final String TRANSACTION_COMMIT_STREAM_ID = TransactionalSpoutCoordinator.class.getName() + BatchDef.ZK_COMMIT_DIR;
    private static final String CURRENT_TX = "currtx";
    private static final String META_DIR = "meta";
    private ITransactionalSpout _spout;
    private ITransactionalSpout.Coordinator _coordinator;
    private TransactionalState _state;
    private RotatingTransactionalState _coordinatorState;
    TreeMap<BigInteger, TransactionStatus> _activeTx = new TreeMap<>();
    private SpoutOutputCollector _collector;
    private Random _rand;
    BigInteger _currTransaction;
    int _maxTransactionActive;
    StateInitializer _initializer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:backtype/storm/transactional/TransactionalSpoutCoordinator$AttemptStatus.class */
    public enum AttemptStatus {
        PROCESSING,
        PROCESSED,
        COMMITTING
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:backtype/storm/transactional/TransactionalSpoutCoordinator$StateInitializer.class */
    public class StateInitializer implements RotatingTransactionalState.StateInitializer {
        private StateInitializer() {
        }

        @Override // backtype.storm.transactional.state.RotatingTransactionalState.StateInitializer
        public Object init(BigInteger bigInteger, Object obj) {
            return TransactionalSpoutCoordinator.this._coordinator.initializeTransaction(bigInteger, obj);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:backtype/storm/transactional/TransactionalSpoutCoordinator$TransactionStatus.class */
    public static class TransactionStatus {
        TransactionAttempt attempt;
        AttemptStatus status = AttemptStatus.PROCESSING;

        public TransactionStatus(TransactionAttempt transactionAttempt) {
            this.attempt = transactionAttempt;
        }

        public String toString() {
            return this.attempt.toString() + " <" + this.status.toString() + ">";
        }
    }

    public TransactionalSpoutCoordinator(ITransactionalSpout iTransactionalSpout) {
        this._spout = iTransactionalSpout;
    }

    public ITransactionalSpout getSpout() {
        return this._spout;
    }

    @Override // backtype.storm.spout.ISpout
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this._rand = new Random(Utils.secureRandomLong());
        this._state = TransactionalState.newCoordinatorState(map, (String) map.get(Config.TOPOLOGY_TRANSACTIONAL_ID), this._spout.getComponentConfiguration());
        this._coordinatorState = new RotatingTransactionalState(this._state, META_DIR, true);
        this._collector = spoutOutputCollector;
        this._coordinator = this._spout.getCoordinator(map, topologyContext);
        this._currTransaction = getStoredCurrTransaction(this._state);
        Object obj = map.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
        if (obj == null) {
            this._maxTransactionActive = 1;
        } else {
            this._maxTransactionActive = Utils.getInt(obj).intValue();
        }
        this._initializer = new StateInitializer();
    }

    @Override // backtype.storm.topology.base.BaseRichSpout, backtype.storm.spout.ISpout
    public void close() {
        this._state.close();
    }

    @Override // backtype.storm.spout.ISpout
    public synchronized void nextTuple() {
        sync();
    }

    @Override // backtype.storm.topology.base.BaseRichSpout, backtype.storm.spout.ISpout
    public synchronized void ack(Object obj) {
        TransactionAttempt transactionAttempt = (TransactionAttempt) obj;
        TransactionStatus transactionStatus = this._activeTx.get(transactionAttempt.getTransactionId());
        if (transactionStatus == null || !transactionAttempt.equals(transactionStatus.attempt)) {
            return;
        }
        if (transactionStatus.status == AttemptStatus.PROCESSING) {
            LOG.debug("acker batch stream {}", transactionAttempt);
            transactionStatus.status = AttemptStatus.PROCESSED;
        } else if (transactionStatus.status == AttemptStatus.COMMITTING) {
            LOG.debug("acker commit stream {}", transactionAttempt);
            this._activeTx.remove(transactionAttempt.getTransactionId());
            this._coordinatorState.cleanupBefore(transactionAttempt.getTransactionId());
            this._currTransaction = nextTransactionId(transactionAttempt.getTransactionId());
            this._state.setData(CURRENT_TX, this._currTransaction);
        }
        sync();
    }

    @Override // backtype.storm.topology.base.BaseRichSpout, backtype.storm.spout.ISpout
    public synchronized void fail(Object obj) {
        TransactionAttempt transactionAttempt = (TransactionAttempt) obj;
        TransactionStatus remove = this._activeTx.remove(transactionAttempt.getTransactionId());
        if (remove == null || !transactionAttempt.equals(remove.attempt)) {
            return;
        }
        this._activeTx.tailMap(transactionAttempt.getTransactionId()).clear();
        sync();
    }

    @Override // backtype.storm.topology.IComponent
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream(TRANSACTION_BATCH_STREAM_ID, new Fields("tx", "tx-meta", "committed-txid"));
        outputFieldsDeclarer.declareStream(TRANSACTION_COMMIT_STREAM_ID, new Fields("tx"));
    }

    private void sync() {
        TransactionStatus transactionStatus = this._activeTx.get(this._currTransaction);
        if (transactionStatus != null && transactionStatus.status == AttemptStatus.PROCESSED) {
            transactionStatus.status = AttemptStatus.COMMITTING;
            LOG.debug("send commit stream {}", transactionStatus);
            this._collector.emit(TRANSACTION_COMMIT_STREAM_ID, new Values(transactionStatus.attempt), transactionStatus.attempt);
        }
        try {
            if (this._activeTx.size() < this._maxTransactionActive) {
                BigInteger bigInteger = this._currTransaction;
                for (int i = 0; i < this._maxTransactionActive; i++) {
                    if ((this._coordinatorState.hasCache(bigInteger) || this._coordinator.isReady()) && !this._activeTx.containsKey(bigInteger)) {
                        TransactionAttempt transactionAttempt = new TransactionAttempt(bigInteger, this._rand.nextLong());
                        Object state = this._coordinatorState.getState(bigInteger, this._initializer);
                        this._activeTx.put(bigInteger, new TransactionStatus(transactionAttempt));
                        LOG.debug("send batch stream {}", transactionAttempt);
                        this._collector.emit(TRANSACTION_BATCH_STREAM_ID, new Values(transactionAttempt, state, previousTransactionId(this._currTransaction)), transactionAttempt);
                    }
                    bigInteger = nextTransactionId(bigInteger);
                }
            }
        } catch (FailedException e) {
            LOG.warn("Failed to get metadata for a transaction", e);
        }
    }

    @Override // backtype.storm.topology.base.BaseComponent, backtype.storm.topology.IComponent
    public Map<String, Object> getComponentConfiguration() {
        Config config = new Config();
        config.setMaxTaskParallelism(1);
        return config;
    }

    private BigInteger nextTransactionId(BigInteger bigInteger) {
        return bigInteger.add(BigInteger.ONE);
    }

    private BigInteger previousTransactionId(BigInteger bigInteger) {
        if (bigInteger.equals(INIT_TXID)) {
            return null;
        }
        return bigInteger.subtract(BigInteger.ONE);
    }

    private BigInteger getStoredCurrTransaction(TransactionalState transactionalState) {
        BigInteger bigInteger = (BigInteger) transactionalState.getData(CURRENT_TX);
        return bigInteger == null ? INIT_TXID : bigInteger;
    }
}
