package com.alibaba.jstorm.transactional.bolt;

import backtype.storm.generated.GlobalStreamId;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.StreamInfo;
import backtype.storm.serialization.SerializationFactory;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.OutputCollectorCb;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IProtoBatchBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.MessageId;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.TupleImplExt;
import backtype.storm.tuple.Values;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
import com.alibaba.jstorm.task.master.ctrlevent.TopoMasterCtrlEvent;
import com.alibaba.jstorm.transactional.BatchCache;
import com.alibaba.jstorm.transactional.BatchGroupId;
import com.alibaba.jstorm.transactional.BatchSnapshot;
import com.alibaba.jstorm.transactional.TransactionCommon;
import com.alibaba.jstorm.transactional.TransactionOutputFieldsDeclarer;
import com.alibaba.jstorm.transactional.state.TransactionState;
import com.alibaba.jstorm.utils.Pair;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/transactional/bolt/TransactionBolt.class */
public class TransactionBolt implements IProtoBatchBolt {
    private static final long serialVersionUID = 7839725373060318309L;
    protected Map conf;
    protected TopologyContext topologyContext;
    protected String topologyId;
    protected int taskId;
    protected String componentId;
    protected Set<Integer> upstreamTasks;
    protected Set<Integer> downstreamTasks;
    protected int topologyMasterId;
    protected boolean isEndBolt = false;
    protected ITransactionBoltExecutor boltExecutor;
    protected TransactionOutputCollector outputCollector;
    protected volatile TransactionState.State boltStatus;
    protected BatchTracker currentBatchTracker;
    protected ConcurrentHashMap<Integer, Long> lastSuccessfulBatch;
    protected HashMap<Integer, Map<Long, BatchTracker>> processingBatches;
    protected BatchCache batchCache;
    protected SerializationFactory.IdDictionary streamIds;
    protected Input kryoInput;
    protected Set<Integer> inputStreamIds;
    public static Logger LOG = LoggerFactory.getLogger(TransactionBolt.class);
    protected static StormTopology sysTopology = null;

    /* loaded from: input_file:com/alibaba/jstorm/transactional/bolt/TransactionBolt$BatchTracker.class */
    public static class BatchTracker {
        private TransactionState state;
        private int expectedTupleCount;
        private int receivedTupleCount;
        private BatchGroupId bactchGroupId = new BatchGroupId();
        private Set<Integer> expectedReceivedSnapshot = new HashSet();
        public HashMap<Integer, CountValue> sendMsgCount = new HashMap<>();

        public BatchTracker(BatchGroupId batchGroupId, Set<Integer> set, Set<Integer> set2) {
            setBatchGroupId(batchGroupId);
            this.expectedReceivedSnapshot.addAll(set);
            this.expectedTupleCount = 0;
            this.receivedTupleCount = 0;
            Iterator<Integer> it = set2.iterator();
            while (it.hasNext()) {
                this.sendMsgCount.put(it.next(), new CountValue());
            }
            this.state = new TransactionState(batchGroupId.groupId, batchGroupId.batchId, null, null);
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("bactchGroupId=" + this.bactchGroupId);
            sb.append(", expectedReceivedSnapshot=" + this.expectedReceivedSnapshot);
            sb.append(", expectedTupleCount=" + this.expectedTupleCount);
            sb.append(", receivedTupleCount=" + this.receivedTupleCount);
            sb.append(", sendMsgCount=" + this.sendMsgCount);
            sb.append(", state=" + this.state);
            return sb.toString();
        }

        public boolean isBatchInprogress() {
            return this.bactchGroupId.groupId != 0;
        }

        public void setBatchGroupId(BatchGroupId batchGroupId) {
            this.bactchGroupId.setBatchGroupId(batchGroupId);
        }

        public BatchGroupId getBatchGroupId() {
            return this.bactchGroupId;
        }

        public boolean isAllBarriersReceived() {
            return this.expectedReceivedSnapshot.size() == 0;
        }

        public void receiveBarrier(int i) {
            this.expectedReceivedSnapshot.remove(Integer.valueOf(i));
        }

        public TransactionState getState() {
            return this.state;
        }

        public boolean checkFinish() {
            boolean z;
            if (isAllBarriersReceived()) {
                z = this.expectedTupleCount == this.receivedTupleCount;
            } else {
                z = false;
            }
            return z;
        }

        public void incrementReceivedCount() {
            this.receivedTupleCount++;
        }

        public void incrementReceivedCount(int i) {
            this.receivedTupleCount += i;
        }

        static /* synthetic */ int access$012(BatchTracker batchTracker, int i) {
            int i2 = batchTracker.expectedTupleCount + i;
            batchTracker.expectedTupleCount = i2;
            return i2;
        }
    }

    /* loaded from: input_file:com/alibaba/jstorm/transactional/bolt/TransactionBolt$CountValue.class */
    public static class CountValue {
        public int count = 0;

        public boolean equals(Object obj) {
            return (obj instanceof CountValue) && ((CountValue) obj).count == this.count;
        }

        public String toString() {
            return String.valueOf(this.count);
        }
    }

    public TransactionBolt(ITransactionBoltExecutor iTransactionBoltExecutor) {
        this.boltExecutor = iTransactionBoltExecutor;
    }

    @Override // backtype.storm.task.IBolt
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.conf = map;
        this.topologyContext = topologyContext;
        this.topologyId = this.topologyContext.getTopologyId();
        this.taskId = this.topologyContext.getThisTaskId();
        this.componentId = this.topologyContext.getThisComponentId();
        this.upstreamTasks = TransactionCommon.getUpstreamTasks(this.componentId, this.topologyContext);
        this.downstreamTasks = TransactionCommon.getDownstreamTasks(this.componentId, this.topologyContext);
        this.topologyMasterId = topologyContext.getTopologyMasterId();
        LOG.info("TransactionBolt: upstreamTasks=" + this.upstreamTasks + ", downstreamTasks=" + this.downstreamTasks);
        this.outputCollector = new TransactionOutputCollector(this, outputCollector);
        this.boltExecutor.prepare(this.conf, topologyContext, new OutputCollector((OutputCollectorCb) this.outputCollector));
        this.boltStatus = TransactionState.State.INIT;
        if (sysTopology == null) {
            try {
                sysTopology = Common.system_topology(map, topologyContext.getRawTopology());
            } catch (InvalidTopologyException e) {
                LOG.error("Failed to build system topology", e);
                throw new RuntimeException(e);
            }
        }
        this.lastSuccessfulBatch = new ConcurrentHashMap<>();
        this.processingBatches = new HashMap<>();
        Set<String> upstreamSpouts = TransactionCommon.getUpstreamSpouts(this.componentId, this.topologyContext);
        Iterator<String> it = upstreamSpouts.iterator();
        while (it.hasNext()) {
            int groupIndex = TransactionCommon.groupIndex(this.topologyContext.getRawTopology(), it.next());
            this.lastSuccessfulBatch.put(Integer.valueOf(groupIndex), 0L);
            this.processingBatches.put(Integer.valueOf(groupIndex), new HashMap());
        }
        this.batchCache = new BatchCache(topologyContext, upstreamSpouts, sysTopology);
        this.kryoInput = new Input(1);
        this.streamIds = new SerializationFactory.IdDictionary(sysTopology);
        this.inputStreamIds = new HashSet();
        for (GlobalStreamId globalStreamId : this.topologyContext.getThisSources().keySet()) {
            this.inputStreamIds.add(Integer.valueOf(this.streamIds.getStreamId(globalStreamId.get_componentId(), globalStreamId.get_streamId())));
        }
        Iterator<String> it2 = TransactionCommon.getUpstreamComponents(this.componentId, this.topologyContext).iterator();
        while (it2.hasNext()) {
            this.inputStreamIds.add(Integer.valueOf(this.streamIds.getStreamId(it2.next(), TransactionCommon.BARRIER_STREAM_ID)));
        }
        startInitState();
    }

    @Override // backtype.storm.task.IBolt
    public void execute(Tuple tuple) {
        try {
            if (tuple.getSourceStreamId().equals(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID)) {
                processTransactionEvent(tuple);
            } else {
                BatchGroupId batchGroupId = (BatchGroupId) ((List) ((Pair) tuple.getValues().get(0)).getSecond()).get(0);
                if (isDiscarded(batchGroupId)) {
                    LOG.debug("Tuple was discarded. {}", tuple);
                } else {
                    if (this.batchCache.isPendingBatch(batchGroupId, this.lastSuccessfulBatch) && this.batchCache.cachePendingBatch(batchGroupId, tuple, this.lastSuccessfulBatch)) {
                        return;
                    }
                    this.currentBatchTracker = getProcessingBatch(batchGroupId, true);
                    this.outputCollector.setCurrBatchTracker(this.currentBatchTracker);
                    processBatchTuple(tuple);
                }
            }
        } catch (Exception e) {
            LOG.info("Failed to process input={}", tuple, e);
            throw new RuntimeException(e);
        }
    }

    protected void processTransactionEvent(Tuple tuple) {
        TopoMasterCtrlEvent topoMasterCtrlEvent = (TopoMasterCtrlEvent) tuple.getValues().get(0);
        switch (topoMasterCtrlEvent.getEventType()) {
            case transactionInitState:
                initState((TransactionState) topoMasterCtrlEvent.getEventValue().get(0));
                this.boltStatus = TransactionState.State.ACTIVE;
                return;
            case transactionRollback:
                this.boltStatus = TransactionState.State.ROLLBACK;
                rollback((TransactionState) topoMasterCtrlEvent.getEventValue().get(0));
                return;
            case transactionCommit:
                ackCommit((BatchGroupId) topoMasterCtrlEvent.getEventValue().get(0));
                return;
            default:
                LOG.warn("Received unexpected transaction event, {}" + topoMasterCtrlEvent.toString());
                return;
        }
    }

    public void processBatch(BatchGroupId batchGroupId, List<Tuple> list) {
        this.currentBatchTracker = getProcessingBatch(batchGroupId, true);
        this.outputCollector.setCurrBatchTracker(this.currentBatchTracker);
        Iterator<Tuple> it = list.iterator();
        while (it.hasNext()) {
            processBatchTuple(it.next());
        }
    }

    public void processBatchTuple(Tuple tuple) {
        String sourceStreamId = tuple.getSourceStreamId();
        if (sourceStreamId.equals(TransactionCommon.BARRIER_STREAM_ID)) {
            BatchSnapshot batchSnapshot = (BatchSnapshot) ((List) ((Pair) tuple.getValue(0)).getSecond()).get(1);
            this.currentBatchTracker.receiveBarrier(tuple.getSourceTask());
            BatchTracker.access$012(this.currentBatchTracker, batchSnapshot.getTupleCount());
            LOG.debug("Received batch, stream={}, batchGroupId={}, sourceTask={}, values={}", new Object[]{sourceStreamId, this.currentBatchTracker.bactchGroupId, Integer.valueOf(tuple.getSourceTask()), batchSnapshot});
            LOG.debug("currentBatchTracker={}, processingBatches={}, pendingBatches={}", new Object[]{this.currentBatchTracker, this.processingBatches, this.batchCache});
        } else {
            Iterator<Object> it = tuple.getValues().iterator();
            while (it.hasNext()) {
                Pair pair = (Pair) it.next();
                ((List) pair.getSecond()).remove(0);
                this.boltExecutor.execute(new TupleImplExt(this.topologyContext, (List<Object>) pair.getSecond(), (MessageId) pair.getFirst(), (TupleImplExt) tuple));
            }
            this.currentBatchTracker.incrementReceivedCount(tuple.getValues().size());
        }
        if (this.currentBatchTracker.checkFinish()) {
            finishCurrentBatch();
        }
    }

    @Override // backtype.storm.task.IBolt
    public void cleanup() {
        this.boltExecutor.cleanup();
        this.batchCache.cleanup();
    }

    @Override // backtype.storm.topology.IComponent
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        TransactionOutputFieldsDeclarer transactionOutputFieldsDeclarer = new TransactionOutputFieldsDeclarer();
        this.boltExecutor.declareOutputFields(transactionOutputFieldsDeclarer);
        Map<String, StreamInfo> fieldsDeclaration = transactionOutputFieldsDeclarer.getFieldsDeclaration();
        for (Map.Entry<String, StreamInfo> entry : fieldsDeclaration.entrySet()) {
            String key = entry.getKey();
            StreamInfo value = entry.getValue();
            outputFieldsDeclarer.declareStream(key, value.is_direct(), new Fields(value.get_output_fields()));
        }
        if (fieldsDeclaration.size() > 0) {
            outputFieldsDeclarer.declareStream(TransactionCommon.BARRIER_STREAM_ID, new Fields(TransactionCommon.BATCH_GROUP_ID_FIELD, TransactionCommon.BARRIER_SNAPSHOT_FIELD));
        } else {
            this.isEndBolt = true;
        }
    }

    @Override // backtype.storm.topology.IComponent
    public Map<String, Object> getComponentConfiguration() {
        return this.boltExecutor.getComponentConfiguration();
    }

    private BatchTracker getProcessingBatch(BatchGroupId batchGroupId, boolean z) {
        BatchTracker batchTracker = null;
        Map<Long, BatchTracker> map = this.processingBatches.get(Integer.valueOf(batchGroupId.groupId));
        if (map != null) {
            batchTracker = map.get(Long.valueOf(batchGroupId.batchId));
            if (batchTracker == null && z) {
                batchTracker = new BatchTracker(batchGroupId, this.upstreamTasks, this.downstreamTasks);
                map.put(Long.valueOf(batchGroupId.batchId), batchTracker);
            }
        }
        return batchTracker;
    }

    protected boolean isDiscarded(BatchGroupId batchGroupId) {
        if (batchGroupId.batchId == 0 || this.boltStatus.equals(TransactionState.State.ACTIVE)) {
            return false;
        }
        LOG.debug("Bolt is not active, state={}, tuple is going to be discarded", this.boltStatus.toString());
        return true;
    }

    protected void startInitState() {
        LOG.info("Start to retrieve initialize state from topology master");
        this.outputCollector.emitDirectByDelegate(this.topologyMasterId, Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID, null, new Values(new TopoMasterCtrlEvent(TopoMasterCtrlEvent.EventType.transactionInitState)));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void initState(TransactionState transactionState) {
        BatchGroupId currBatchGroupId = transactionState.getCurrBatchGroupId();
        this.lastSuccessfulBatch.put(Integer.valueOf(currBatchGroupId.groupId), Long.valueOf(currBatchGroupId.batchId));
        LOG.info("Init: state={}, lastSuccessfulBatch={}", transactionState, this.lastSuccessfulBatch);
    }

    protected void commit() {
    }

    protected void ackCommit(BatchGroupId batchGroupId) {
    }

    protected void rollback(TransactionState transactionState) {
        BatchGroupId currBatchGroupId = transactionState.getCurrBatchGroupId();
        LOG.info("Start to rollback to batch-{}, currentBatchStateus={}", currBatchGroupId, currentBatchStatusInfo());
        this.lastSuccessfulBatch.put(Integer.valueOf(currBatchGroupId.groupId), Long.valueOf(currBatchGroupId.batchId));
        cleanupBuffer(transactionState.getCurrBatchGroupId().groupId);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void cleanupBuffer(int i) {
        if (this.currentBatchTracker != null && this.currentBatchTracker.getBatchGroupId().groupId == i) {
            this.currentBatchTracker = null;
        }
        this.processingBatches.get(Integer.valueOf(i)).clear();
        this.batchCache.cleanup(i);
        LOG.info("cleanupBuffer: processingBatches={}, pendingBatches={}, currentBatchTracker={}", new Object[]{this.processingBatches, this.batchCache, this.currentBatchTracker});
    }

    private void ackBatch(BatchGroupId batchGroupId) {
        TopoMasterCtrlEvent topoMasterCtrlEvent = new TopoMasterCtrlEvent(TopoMasterCtrlEvent.EventType.transactionAck);
        topoMasterCtrlEvent.addEventValue(batchGroupId);
        this.outputCollector.emitDirectByDelegate(this.topologyMasterId, Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID, null, new Values(topoMasterCtrlEvent));
    }

    private void removeProcessingBatch(BatchGroupId batchGroupId) {
        this.processingBatches.get(Integer.valueOf(batchGroupId.groupId)).remove(Long.valueOf(batchGroupId.batchId));
    }

    private void finishCurrentBatch() {
        BatchGroupId batchGroupId = this.currentBatchTracker.bactchGroupId;
        if (batchGroupId.batchId == 0) {
            LOG.info("Received all init events");
            cleanupBuffer(batchGroupId.groupId);
            this.boltStatus = TransactionState.State.ACTIVE;
        } else {
            commit();
            this.lastSuccessfulBatch.put(Integer.valueOf(batchGroupId.groupId), Long.valueOf(batchGroupId.batchId));
        }
        if (this.downstreamTasks.size() == 0) {
            ackBatch(batchGroupId);
        } else {
            this.outputCollector.flushBarrier();
        }
        removeProcessingBatch(batchGroupId);
        this.currentBatchTracker = null;
        LOG.debug("finishCurrentBatch, {}", currentBatchStatusInfo());
        List<Tuple> nextPendingTuples = this.batchCache.getNextPendingTuples(this.lastSuccessfulBatch);
        if (nextPendingTuples != null) {
            processBatch(new BatchGroupId(batchGroupId.groupId, batchGroupId.batchId + 1), nextPendingTuples);
        }
    }

    public void fail(BatchGroupId batchGroupId) {
        TopoMasterCtrlEvent topoMasterCtrlEvent = new TopoMasterCtrlEvent(TopoMasterCtrlEvent.EventType.transactionRollback);
        topoMasterCtrlEvent.addEventValue(batchGroupId);
        this.outputCollector.emitDirectByDelegate(this.topologyMasterId, Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID, null, new Values(topoMasterCtrlEvent));
    }

    @Override // backtype.storm.topology.IProtoBatchBolt
    public List<byte[]> protoExecute(byte[] bArr) {
        if (this.batchCache == null || !this.batchCache.isExactlyOnceMode()) {
            return null;
        }
        ArrayList arrayList = new ArrayList();
        if (this.kryoInput == null) {
            arrayList.add(bArr);
            return arrayList;
        }
        this.kryoInput.setBuffer(bArr);
        this.kryoInput.setPosition(13);
        this.kryoInput.readInt(true);
        if (this.inputStreamIds.contains(Integer.valueOf(this.kryoInput.readInt(true)))) {
            this.kryoInput.readInt(true);
            this.kryoInput.readInt(true);
            this.kryoInput.readInt(true);
            this.kryoInput.readInt(true);
            BatchGroupId batchGroupId = new BatchGroupId(this.kryoInput.readInt(true), this.kryoInput.readLong(true));
            if (batchGroupId.batchId == 0) {
                arrayList.add(bArr);
            } else if (!this.batchCache.isPendingBatch(batchGroupId, this.lastSuccessfulBatch)) {
                arrayList.add(bArr);
            } else if (!this.batchCache.cachePendingBatch(batchGroupId, bArr, this.lastSuccessfulBatch)) {
                arrayList.add(bArr);
            }
        } else {
            arrayList.add(bArr);
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String currentBatchStatusInfo() {
        return "processingBatches=" + this.processingBatches + ", pendingBatches=" + this.batchCache + ", lastSuccessfulBatch=" + this.lastSuccessfulBatch;
    }
}
