package com.alibaba.jstorm.transactional.spout;

import backtype.storm.Config;
import backtype.storm.generated.StreamInfo;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.spout.SpoutOutputCollectorCb;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.client.spout.ICtrlMsgSpout;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.task.master.ctrlevent.TopoMasterCtrlEvent;
import com.alibaba.jstorm.transactional.BatchGroupId;
import com.alibaba.jstorm.transactional.TransactionCommon;
import com.alibaba.jstorm.transactional.TransactionOutputFieldsDeclarer;
import com.alibaba.jstorm.transactional.spout.TransactionSpoutOutputCollector;
import com.alibaba.jstorm.transactional.state.TransactionState;
import com.alibaba.jstorm.utils.IntervalCheck;
import com.alibaba.jstorm.utils.JStormUtils;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/transactional/spout/TransactionSpout.class */
public class TransactionSpout implements IRichSpout, ICtrlMsgSpout {
    private static final long serialVersionUID = 8289040804669685438L;
    public static Logger LOG = LoggerFactory.getLogger(TransactionSpout.class);
    protected Map conf;
    protected TopologyContext topologyContext;
    protected String topologyId;
    protected int taskId;
    protected int topologyMasterId;
    protected String componentId;
    protected Set<Integer> downstreamTasks;
    protected ITransactionSpoutExecutor spoutExecutor;
    protected TransactionSpoutOutputCollector outputCollector;
    protected volatile TransactionState.State spoutStatus;
    protected IntervalCheck initRetryCheck;
    protected int groupId;
    protected TransactionState currState;
    protected SortedSet<Long> committingBatches;
    protected volatile boolean isMaxPending;
    protected int MAX_PENDING_BATCH_NUM;
    protected int MAX_FAIL_RETRY;
    protected Lock lock;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/alibaba/jstorm/transactional/spout/TransactionSpout$Operation.class */
    public enum Operation {
        nextTuple,
        ctrlEvent,
        commit
    }

    public TransactionSpout(ITransactionSpoutExecutor iTransactionSpoutExecutor) {
        this.spoutExecutor = iTransactionSpoutExecutor;
    }

    @Override // backtype.storm.spout.ISpout
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.conf = map;
        this.topologyContext = topologyContext;
        this.topologyId = this.topologyContext.getTopologyId();
        this.taskId = this.topologyContext.getThisTaskId();
        this.topologyMasterId = this.topologyContext.getTopologyMasterId();
        this.componentId = this.topologyContext.getThisComponentId();
        this.groupId = TransactionCommon.groupIndex(topologyContext.getRawTopology(), this.componentId);
        this.downstreamTasks = TransactionCommon.getDownstreamTasks(this.componentId, this.topologyContext);
        this.outputCollector = new TransactionSpoutOutputCollector(spoutOutputCollector, this);
        this.spoutStatus = TransactionState.State.INIT;
        this.committingBatches = new TreeSet();
        this.isMaxPending = false;
        this.MAX_PENDING_BATCH_NUM = JStormUtils.parseInt(map.get("transaction.max.pending.batch"), 2).intValue();
        int intValue = JStormUtils.parseInt(map.get("transaction.spout.init.retry.secs"), JStormUtils.parseInt(map.get(Config.NIMBUS_TASK_LAUNCH_SECS)).intValue()).intValue();
        this.initRetryCheck = new IntervalCheck();
        this.initRetryCheck.setInterval(intValue);
        this.lock = new ReentrantLock(true);
        this.spoutExecutor.open(map, topologyContext, new SpoutOutputCollector((SpoutOutputCollectorCb) this.outputCollector));
    }

    @Override // backtype.storm.spout.ISpout
    public void close() {
        this.spoutExecutor.close();
    }

    @Override // backtype.storm.spout.ISpout
    public void activate() {
        this.spoutExecutor.activate();
    }

    @Override // backtype.storm.spout.ISpout
    public void deactivate() {
        this.spoutExecutor.deactivate();
    }

    @Override // backtype.storm.spout.ISpout
    public void nextTuple() {
        process(Operation.nextTuple, null);
    }

    @Override // backtype.storm.spout.ISpout
    public void ack(Object obj) {
        LOG.error("This ack method should not be called.");
    }

    @Override // backtype.storm.spout.ISpout
    public void fail(Object obj) {
        LOG.error("This fail method should not be called.");
    }

    @Override // backtype.storm.topology.IComponent
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream(TransactionCommon.BARRIER_STREAM_ID, new Fields(TransactionCommon.BATCH_GROUP_ID_FIELD, TransactionCommon.BARRIER_SNAPSHOT_FIELD));
        TransactionOutputFieldsDeclarer transactionOutputFieldsDeclarer = new TransactionOutputFieldsDeclarer();
        this.spoutExecutor.declareOutputFields(transactionOutputFieldsDeclarer);
        for (Map.Entry<String, StreamInfo> entry : transactionOutputFieldsDeclarer.getFieldsDeclaration().entrySet()) {
            String key = entry.getKey();
            StreamInfo value = entry.getValue();
            outputFieldsDeclarer.declareStream(key, value.is_direct(), new Fields(value.get_output_fields()));
        }
    }

    @Override // backtype.storm.topology.IComponent
    public Map<String, Object> getComponentConfiguration() {
        return this.spoutExecutor.getComponentConfiguration();
    }

    @Override // com.alibaba.jstorm.client.spout.ICtrlMsgSpout
    public void procCtrlMsg(TopoMasterCtrlEvent topoMasterCtrlEvent) {
        process(Operation.ctrlEvent, topoMasterCtrlEvent);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void process(Operation operation, Object obj) {
        try {
            this.lock.lock();
            if (operation.equals(Operation.nextTuple)) {
                doNextTuple();
            } else if (operation.equals(Operation.commit)) {
                commit();
            } else if (operation.equals(Operation.ctrlEvent)) {
                processCtrlEvent((TopoMasterCtrlEvent) obj);
            }
        } finally {
            this.lock.unlock();
        }
    }

    protected void doNextTuple() {
        if (!this.spoutStatus.equals(TransactionState.State.INIT)) {
            if (isActive()) {
                this.spoutExecutor.nextTuple();
                return;
            } else {
                JStormUtils.sleepMs(1L);
                return;
            }
        }
        if (!this.initRetryCheck.isStart()) {
            this.initRetryCheck.start();
            startInitState();
        } else if (this.initRetryCheck.check()) {
            startInitState();
        }
        JStormUtils.sleepMs(10L);
    }

    protected void processCtrlEvent(TopoMasterCtrlEvent topoMasterCtrlEvent) {
        TransactionState transactionState = null;
        switch (topoMasterCtrlEvent.getEventType()) {
            case transactionInitState:
                if (this.spoutStatus.equals(TransactionState.State.INIT)) {
                    if (topoMasterCtrlEvent.hasEventValue()) {
                        transactionState = (TransactionState) topoMasterCtrlEvent.getEventValue().get(0);
                    }
                    initSpoutState(transactionState);
                    this.spoutStatus = TransactionState.State.ACTIVE;
                    return;
                }
                return;
            case transactionRollback:
                this.spoutStatus = TransactionState.State.ROLLBACK;
                if (topoMasterCtrlEvent.hasEventValue()) {
                    transactionState = (TransactionState) topoMasterCtrlEvent.getEventValue().get(0);
                }
                rollbackSpoutState(transactionState);
                LOG.info("Rollback to state, {}", transactionState);
                JStormUtils.sleepMs(5000L);
                this.outputCollector.flushInitBarrier();
                return;
            case transactionCommit:
                removeSuccessBatch(((BatchGroupId) topoMasterCtrlEvent.getEventValue().get(0)).batchId);
                return;
            case transactionStop:
                this.spoutStatus = TransactionState.State.INACTIVE;
                LOG.info("Stop, current pending batches: {}", this.committingBatches);
                return;
            case transactionStart:
                this.spoutStatus = TransactionState.State.ACTIVE;
                LOG.info("Start, current pending batches: {}", this.committingBatches);
                return;
            default:
                LOG.warn("Received unsupported event, {}", topoMasterCtrlEvent.toString());
                return;
        }
    }

    public boolean isActive() {
        return !this.isMaxPending && this.spoutStatus.equals(TransactionState.State.ACTIVE);
    }

    protected void startInitState() {
        LOG.info("Start to retrieve the initial state from topology master");
        this.outputCollector.emitDirectByDelegate(this.topologyMasterId, Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID, new Values(new TopoMasterCtrlEvent(TopoMasterCtrlEvent.EventType.transactionInitState)), null);
    }

    private void initSpoutState(TransactionState transactionState) {
        LOG.info("Initial state for spout: {}", transactionState);
        if (transactionState == null) {
            this.currState = new TransactionState(this.groupId, 0L, null, null);
        } else {
            this.currState = new TransactionState(transactionState);
        }
        this.spoutExecutor.initState(Utils.maybe_deserialize((byte[]) this.currState.getUserCheckpoint()));
        resetSpoutState();
    }

    private void rollbackSpoutState(TransactionState transactionState) {
        if (transactionState != null) {
            this.currState = new TransactionState(transactionState);
            this.spoutExecutor.rollBack(Utils.maybe_deserialize((byte[]) transactionState.getUserCheckpoint()));
        }
        resetSpoutState();
    }

    private void resetSpoutState() {
        this.committingBatches.clear();
        updateMaxPendingFlag();
        this.outputCollector.init(this.currState.getCurrBatchGroupId(), this.downstreamTasks);
        this.outputCollector.moveToNextBatch();
    }

    protected void commit() {
        if (isActive()) {
            Object finishBatch = this.spoutExecutor.finishBatch();
            TransactionSpoutOutputCollector.BatchInfo flushBarrier = this.outputCollector.flushBarrier();
            BatchGroupId batchGroupId = new BatchGroupId(this.groupId, flushBarrier.batchId);
            try {
                Object commit = this.spoutExecutor.commit(batchGroupId, finishBatch);
                if (commit == TransactionCommon.COMMIT_FAIL) {
                    LOG.warn("Failed to commit spout state for batch-{}", batchGroupId);
                    return;
                }
                this.currState.setBatchId(flushBarrier.batchId);
                this.currState.setUserCheckpoint(Utils.trySerialize(commit));
                this.committingBatches.add(Long.valueOf(flushBarrier.batchId));
                updateMaxPendingFlag();
                TopoMasterCtrlEvent topoMasterCtrlEvent = new TopoMasterCtrlEvent(TopoMasterCtrlEvent.EventType.transactionCommit);
                TransactionState transactionState = new TransactionState(this.currState);
                topoMasterCtrlEvent.addEventValue(transactionState.getCurrBatchGroupId());
                topoMasterCtrlEvent.addEventValue(transactionState);
                this.outputCollector.emitDirectByDelegate(this.topologyMasterId, Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID, new Values(topoMasterCtrlEvent), null);
            } catch (Exception e) {
                LOG.warn("Failed to commit spout state for batch-{}, {}", batchGroupId, e);
            }
        }
    }

    private void removeSuccessBatch(long j) {
        TreeSet treeSet = new TreeSet((SortedSet) this.committingBatches.headSet(Long.valueOf(j)));
        if (treeSet.size() > 0) {
            LOG.info("Obsolete batcheIds are {}, successBatchId is {}", treeSet, Long.valueOf(j));
            this.committingBatches.removeAll(treeSet);
        }
        if (!this.committingBatches.remove(Long.valueOf(j))) {
            LOG.info("Batch-{} has alreay been removed", Long.valueOf(j));
        }
        updateMaxPendingFlag();
    }

    private void updateMaxPendingFlag() {
        if (this.MAX_PENDING_BATCH_NUM == 0) {
            this.isMaxPending = false;
        } else if (this.committingBatches.size() < this.MAX_PENDING_BATCH_NUM) {
            this.isMaxPending = false;
        } else {
            this.isMaxPending = true;
        }
    }
}
