/*
 * Decompiled with CFR 0.152.
 */
package storm.trident.topology;

import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.WindowedTimeThrottler;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.trident.spout.ITridentSpout;
import storm.trident.topology.TransactionAttempt;
import storm.trident.topology.state.TransactionalState;

public class MasterBatchCoordinator
extends BaseRichSpout {
    public static final Logger LOG = LoggerFactory.getLogger(MasterBatchCoordinator.class);
    public static final long INIT_TXID = 1L;
    public static final String BATCH_STREAM_ID = "$batch";
    public static final String COMMIT_STREAM_ID = "$commit";
    public static final String SUCCESS_STREAM_ID = "$success";
    private static final String CURRENT_TX = "currtx";
    private static final String CURRENT_ATTEMPTS = "currattempts";
    private List<TransactionalState> _states = new ArrayList<TransactionalState>();
    TreeMap<Long, TransactionStatus> _activeTx = new TreeMap();
    TreeMap<Long, Integer> _attemptIds;
    private SpoutOutputCollector _collector;
    Long _currTransaction;
    int _maxTransactionActive;
    List<ITridentSpout.BatchCoordinator> _coordinators = new ArrayList<ITridentSpout.BatchCoordinator>();
    List<String> _managedSpoutIds;
    List<ITridentSpout> _spouts;
    WindowedTimeThrottler _throttler;
    boolean _active = true;

    public MasterBatchCoordinator(List<String> spoutIds, List<ITridentSpout> spouts) {
        if (spoutIds.isEmpty()) {
            throw new IllegalArgumentException("Must manage at least one spout");
        }
        this._managedSpoutIds = spoutIds;
        this._spouts = spouts;
    }

    public List<String> getManagedSpoutIds() {
        return this._managedSpoutIds;
    }

    @Override
    public void activate() {
        this._active = true;
    }

    @Override
    public void deactivate() {
        this._active = false;
    }

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this._throttler = new WindowedTimeThrottler((Number)conf.get("topology.trident.batch.emit.interval.millis"), 1);
        for (String spoutId : this._managedSpoutIds) {
            this._states.add(TransactionalState.newCoordinatorState(conf, spoutId));
        }
        this._currTransaction = this.getStoredCurrTransaction();
        this._collector = collector;
        Number active = (Number)conf.get("topology.max.spout.pending");
        this._maxTransactionActive = active == null ? 1 : active.intValue();
        this._attemptIds = this.getStoredCurrAttempts(this._currTransaction, this._maxTransactionActive);
        for (int i = 0; i < this._spouts.size(); ++i) {
            String txId = this._managedSpoutIds.get(i);
            this._coordinators.add(this._spouts.get(i).getCoordinator(txId, conf, context));
        }
    }

    @Override
    public void close() {
        for (TransactionalState state : this._states) {
            state.close();
        }
    }

    @Override
    public void nextTuple() {
        this.sync();
    }

    @Override
    public void ack(Object msgId) {
        TransactionAttempt tx = (TransactionAttempt)msgId;
        TransactionStatus status = this._activeTx.get(tx.getTransactionId());
        if (status != null && tx.equals(status.attempt)) {
            if (status.status == AttemptStatus.PROCESSING) {
                status.status = AttemptStatus.PROCESSED;
            } else if (status.status == AttemptStatus.COMMITTING) {
                this._activeTx.remove(tx.getTransactionId());
                this._attemptIds.remove(tx.getTransactionId());
                this._collector.emit(SUCCESS_STREAM_ID, new Values(tx));
                this._currTransaction = this.nextTransactionId(tx.getTransactionId());
                for (TransactionalState state : this._states) {
                    state.setData(CURRENT_TX, this._currTransaction);
                }
            }
            this.sync();
        }
    }

    @Override
    public void fail(Object msgId) {
        TransactionAttempt tx = (TransactionAttempt)msgId;
        TransactionStatus stored = this._activeTx.remove(tx.getTransactionId());
        if (stored != null && tx.equals(stored.attempt)) {
            this._activeTx.tailMap(tx.getTransactionId()).clear();
            this.sync();
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream(BATCH_STREAM_ID, new Fields("tx"));
        declarer.declareStream(COMMIT_STREAM_ID, new Fields("tx"));
        declarer.declareStream(SUCCESS_STREAM_ID, new Fields("tx"));
    }

    private void sync() {
        TransactionStatus maybeCommit = this._activeTx.get(this._currTransaction);
        if (maybeCommit != null && maybeCommit.status == AttemptStatus.PROCESSED) {
            maybeCommit.status = AttemptStatus.COMMITTING;
            this._collector.emit(COMMIT_STREAM_ID, (List<Object>)new Values(maybeCommit.attempt), maybeCommit.attempt);
        }
        if (this._active && this._activeTx.size() < this._maxTransactionActive) {
            Long curr = this._currTransaction;
            for (int i = 0; i < this._maxTransactionActive; ++i) {
                if (!this._activeTx.containsKey(curr) && this.isReady(curr)) {
                    Integer attemptId = this._attemptIds.get(curr);
                    if (attemptId == null) {
                        attemptId = 0;
                    } else {
                        Integer n = attemptId;
                        Integer n2 = attemptId = Integer.valueOf(attemptId + 1);
                    }
                    this._attemptIds.put(curr, attemptId);
                    for (TransactionalState state : this._states) {
                        state.setData(CURRENT_ATTEMPTS, this._attemptIds);
                    }
                    TransactionAttempt attempt = new TransactionAttempt(curr, attemptId);
                    this._activeTx.put(curr, new TransactionStatus(attempt));
                    this._collector.emit(BATCH_STREAM_ID, (List<Object>)new Values(attempt), attempt);
                    this._throttler.markEvent();
                }
                curr = this.nextTransactionId(curr);
            }
        }
    }

    private boolean isReady(long txid) {
        if (this._throttler.isThrottled()) {
            return false;
        }
        for (ITridentSpout.BatchCoordinator coord : this._coordinators) {
            if (!coord.isReady(txid)) continue;
            return true;
        }
        return false;
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        Config ret = new Config();
        ret.setMaxTaskParallelism(1);
        ret.registerSerialization(TransactionAttempt.class);
        return ret;
    }

    private Long nextTransactionId(Long id) {
        return id + 1L;
    }

    private Long getStoredCurrTransaction() {
        Long ret = 1L;
        for (TransactionalState state : this._states) {
            Long curr = (Long)state.getData(CURRENT_TX);
            if (curr == null || curr.compareTo(ret) <= 0) continue;
            ret = curr;
        }
        return ret;
    }

    private TreeMap<Long, Integer> getStoredCurrAttempts(long currTransaction, int maxBatches) {
        TreeMap<Long, Integer> ret = new TreeMap<Long, Integer>();
        for (TransactionalState state : this._states) {
            HashMap attempts = (HashMap)state.getData(CURRENT_ATTEMPTS);
            if (attempts == null) {
                attempts = new HashMap();
            }
            for (Map.Entry e : attempts.entrySet()) {
                Number txidObj = e.getKey() instanceof String ? (Number)Long.parseLong((String)e.getKey()) : (Number)((Number)e.getKey());
                long txid = txidObj.longValue();
                int attemptId = ((Number)e.getValue()).intValue();
                Integer curr = ret.get(txid);
                if (curr != null && attemptId <= curr) continue;
                ret.put(txid, attemptId);
            }
        }
        ret.headMap(currTransaction).clear();
        ret.tailMap(currTransaction + (long)maxBatches - 1L).clear();
        return ret;
    }

    private static class TransactionStatus {
        TransactionAttempt attempt;
        AttemptStatus status;

        public TransactionStatus(TransactionAttempt attempt) {
            this.attempt = attempt;
            this.status = AttemptStatus.PROCESSING;
        }

        public String toString() {
            return this.attempt.toString() + " <" + this.status.toString() + ">";
        }
    }

    private static enum AttemptStatus {
        PROCESSING,
        PROCESSED,
        COMMITTING;

    }
}

