package com.alibaba.jstorm.batch.impl;

import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.ICollectorCallback;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import com.alibaba.jstorm.batch.BatchId;
import com.alibaba.jstorm.batch.util.BatchCommon;
import com.alibaba.jstorm.batch.util.BatchDef;
import com.alibaba.jstorm.batch.util.BatchStatus;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.ClusterState;
import com.alibaba.jstorm.utils.IntervalCheck;
import com.alibaba.jstorm.utils.JStormUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/batch/impl/BatchSpoutTrigger.class */
public class BatchSpoutTrigger implements IRichSpout {
    private static final long serialVersionUID = 7215109169247425954L;
    private LinkedBlockingQueue<BatchSpoutMsgId> batchQueue;
    private transient ClusterState zkClient;
    private transient SpoutOutputCollector collector;
    private static final String ZK_NODE_PATH = "/trigger";
    private Map conf;
    private String taskName;
    private IntervalCheck intervalCheck;
    private static final Logger LOG = LoggerFactory.getLogger(BatchSpoutTrigger.class);
    private static BatchId currentBatchId = null;

    /* loaded from: input_file:com/alibaba/jstorm/batch/impl/BatchSpoutTrigger$EmitCb.class */
    class EmitCb implements ICollectorCallback {
        private BatchSpoutMsgId msgId;

        public EmitCb(BatchSpoutMsgId batchSpoutMsgId) {
            this.msgId = batchSpoutMsgId;
        }

        @Override // backtype.storm.task.ICollectorCallback
        public void execute(List<Integer> list) {
            if (list.isEmpty()) {
                BatchSpoutTrigger.this.forward(this.msgId);
            }
        }
    }

    public void initMsgId() throws Exception {
        Long l = null;
        byte[] bArr = this.zkClient.get_data(ZK_NODE_PATH, false);
        if (bArr != null) {
            try {
                l = Long.valueOf(new String(bArr));
                LOG.info("ZK msgId:" + l);
            } catch (Exception e) {
                LOG.warn("Failed to get msgId ", e);
            }
        }
        if (l != null) {
            BatchId.updateId(l.longValue());
        }
        int intValue = JStormUtils.parseInt(this.conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 1).intValue();
        for (int i = 0; i < intValue; i++) {
            BatchSpoutMsgId mkInstance = BatchSpoutMsgId.mkInstance();
            if (currentBatchId == null) {
                currentBatchId = mkInstance.getBatchId();
            }
            this.batchQueue.offer(mkInstance);
            LOG.info("Push into queue," + mkInstance);
        }
    }

    @Override // backtype.storm.spout.ISpout
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.batchQueue = new LinkedBlockingQueue<>();
        this.collector = spoutOutputCollector;
        this.conf = map;
        this.taskName = topologyContext.getThisComponentId() + "_" + topologyContext.getThisTaskId();
        this.intervalCheck = new IntervalCheck();
        try {
            this.zkClient = BatchCommon.getZkClient(map);
            initMsgId();
            LOG.info("Successfully open " + this.taskName);
        } catch (Exception e) {
            LOG.error("", e);
            throw new RuntimeException("Failed to init");
        }
    }

    @Override // backtype.storm.spout.ISpout
    public void close() {
        this.zkClient.close();
    }

    @Override // backtype.storm.spout.ISpout
    public void activate() {
        LOG.info("Activate " + this.taskName);
    }

    @Override // backtype.storm.spout.ISpout
    public void deactivate() {
        LOG.info("Deactivate " + this.taskName);
    }

    protected String getStreamId(BatchStatus batchStatus) {
        if (batchStatus == BatchStatus.COMPUTING) {
            return BatchDef.COMPUTING_STREAM_ID;
        }
        if (batchStatus == BatchStatus.PREPARE_COMMIT) {
            return BatchDef.PREPARE_STREAM_ID;
        }
        if (batchStatus == BatchStatus.COMMIT) {
            return BatchDef.COMMIT_STREAM_ID;
        }
        if (batchStatus == BatchStatus.POST_COMMIT) {
            return BatchDef.POST_STREAM_ID;
        }
        if (batchStatus == BatchStatus.REVERT_COMMIT) {
            return BatchDef.REVERT_STREAM_ID;
        }
        LOG.error("Occur unkonw type BatchStatus " + batchStatus);
        throw new RuntimeException();
    }

    protected boolean isCommitStatus(BatchStatus batchStatus) {
        return batchStatus == BatchStatus.COMMIT || batchStatus == BatchStatus.REVERT_COMMIT;
    }

    protected boolean isCommitWait(BatchSpoutMsgId batchSpoutMsgId) {
        return isCommitStatus(batchSpoutMsgId.getBatchStatus()) && currentBatchId.getId() < batchSpoutMsgId.getBatchId().getId();
    }

    @Override // backtype.storm.spout.ISpout
    public void nextTuple() {
        BatchSpoutMsgId batchSpoutMsgId = null;
        try {
            batchSpoutMsgId = this.batchQueue.poll(10L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            LOG.error("", e);
        }
        if (batchSpoutMsgId == null) {
            return;
        }
        if (!isCommitWait(batchSpoutMsgId)) {
            this.collector.emit(getStreamId(batchSpoutMsgId.getBatchStatus()), new Values(batchSpoutMsgId.getBatchId()), batchSpoutMsgId, new EmitCb(batchSpoutMsgId));
            return;
        }
        this.batchQueue.offer(batchSpoutMsgId);
        if (this.intervalCheck.check()) {
            LOG.info("Current msgId " + batchSpoutMsgId + ", but current commit BatchId is " + currentBatchId);
        } else {
            LOG.debug("Current msgId " + batchSpoutMsgId + ", but current commit BatchId is " + currentBatchId);
        }
    }

    protected void mkMsgId(BatchSpoutMsgId batchSpoutMsgId) {
        synchronized (BatchSpoutMsgId.class) {
            if (currentBatchId.getId() <= batchSpoutMsgId.getBatchId().getId()) {
                try {
                    this.zkClient.set_data(ZK_NODE_PATH, String.valueOf(currentBatchId.getId()).getBytes());
                } catch (Exception e) {
                    LOG.error("Failed to update to ZK " + batchSpoutMsgId, e);
                }
                currentBatchId = BatchId.incBatchId(batchSpoutMsgId.getBatchId());
            }
        }
        BatchSpoutMsgId mkInstance = BatchSpoutMsgId.mkInstance();
        this.batchQueue.offer(mkInstance);
        StringBuilder sb = new StringBuilder();
        sb.append("Create new BatchId,");
        sb.append("old:").append(batchSpoutMsgId);
        sb.append("new:").append(mkInstance);
        sb.append("currentBatchId:").append(currentBatchId);
        LOG.info(sb.toString());
    }

    protected void forward(BatchSpoutMsgId batchSpoutMsgId) {
        BatchStatus forward = batchSpoutMsgId.getBatchStatus().forward();
        if (forward == null) {
            mkMsgId(batchSpoutMsgId);
            LOG.info("Finish old batch " + batchSpoutMsgId);
        } else {
            batchSpoutMsgId.setBatchStatus(forward);
            this.batchQueue.offer(batchSpoutMsgId);
            LOG.info("Forward batch " + batchSpoutMsgId);
        }
    }

    @Override // backtype.storm.spout.ISpout
    public void ack(Object obj) {
        if (obj instanceof BatchSpoutMsgId) {
            forward((BatchSpoutMsgId) obj);
        } else {
            LOG.warn("Unknown type msgId " + obj.getClass().getName() + ":" + obj);
        }
    }

    protected void handleFail(BatchSpoutMsgId batchSpoutMsgId) {
        LOG.info("Failed batch " + batchSpoutMsgId);
        BatchStatus error = batchSpoutMsgId.getBatchStatus().error();
        if (error == BatchStatus.ERROR) {
            mkMsgId(batchSpoutMsgId);
        } else {
            batchSpoutMsgId.setBatchStatus(error);
            this.batchQueue.offer(batchSpoutMsgId);
        }
    }

    @Override // backtype.storm.spout.ISpout
    public void fail(Object obj) {
        if (obj instanceof BatchSpoutMsgId) {
            handleFail((BatchSpoutMsgId) obj);
        } else {
            LOG.warn("Unknown type msgId " + obj.getClass().getName() + ":" + obj);
        }
    }

    @Override // backtype.storm.topology.IComponent
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream(BatchDef.COMPUTING_STREAM_ID, new Fields("BatchId"));
        outputFieldsDeclarer.declareStream(BatchDef.PREPARE_STREAM_ID, new Fields("BatchId"));
        outputFieldsDeclarer.declareStream(BatchDef.COMMIT_STREAM_ID, new Fields("BatchId"));
        outputFieldsDeclarer.declareStream(BatchDef.REVERT_STREAM_ID, new Fields("BatchId"));
        outputFieldsDeclarer.declareStream(BatchDef.POST_STREAM_ID, new Fields("BatchId"));
    }

    @Override // backtype.storm.topology.IComponent
    public Map<String, Object> getComponentConfiguration() {
        HashMap hashMap = new HashMap();
        ConfigExtension.setSpoutSingleThread(hashMap, true);
        return hashMap;
    }
}
