/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.batch.impl;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.ICollectorCallback;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import com.alibaba.jstorm.batch.BatchId;
import com.alibaba.jstorm.batch.impl.BatchSpoutMsgId;
import com.alibaba.jstorm.batch.util.BatchCommon;
import com.alibaba.jstorm.batch.util.BatchStatus;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.ClusterState;
import com.alibaba.jstorm.utils.IntervalCheck;
import com.alibaba.jstorm.utils.JStormUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BatchSpoutTrigger
implements IRichSpout {
    private static final long serialVersionUID = 7215109169247425954L;
    private static final Logger LOG = LoggerFactory.getLogger(BatchSpoutTrigger.class);
    private LinkedBlockingQueue<BatchSpoutMsgId> batchQueue;
    private transient ClusterState zkClient;
    private transient SpoutOutputCollector collector;
    private static final String ZK_NODE_PATH = "/trigger";
    private static BatchId currentBatchId = null;
    private Map conf;
    private String taskName;
    private IntervalCheck intervalCheck;

    public void initMsgId() throws Exception {
        Long zkMsgId = null;
        byte[] data = this.zkClient.get_data(ZK_NODE_PATH, false);
        if (data != null) {
            String value = new String(data);
            try {
                zkMsgId = Long.valueOf(value);
                LOG.info("ZK msgId:" + zkMsgId);
            }
            catch (Exception e) {
                LOG.warn("Failed to get msgId ", (Throwable)e);
            }
        }
        if (zkMsgId != null) {
            BatchId.updateId(zkMsgId);
        }
        int max_spout_pending = JStormUtils.parseInt(this.conf.get("topology.max.spout.pending"), 1);
        for (int i = 0; i < max_spout_pending; ++i) {
            BatchSpoutMsgId msgId = BatchSpoutMsgId.mkInstance();
            if (currentBatchId == null) {
                currentBatchId = msgId.getBatchId();
            }
            this.batchQueue.offer(msgId);
            LOG.info("Push into queue," + msgId);
        }
    }

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.batchQueue = new LinkedBlockingQueue();
        this.collector = collector;
        this.conf = conf;
        this.taskName = context.getThisComponentId() + "_" + context.getThisTaskId();
        this.intervalCheck = new IntervalCheck();
        try {
            this.zkClient = BatchCommon.getZkClient(conf);
            this.initMsgId();
        }
        catch (Exception e) {
            LOG.error("", (Throwable)e);
            throw new RuntimeException("Failed to init");
        }
        LOG.info("Successfully open " + this.taskName);
    }

    @Override
    public void close() {
        this.zkClient.close();
    }

    @Override
    public void activate() {
        LOG.info("Activate " + this.taskName);
    }

    @Override
    public void deactivate() {
        LOG.info("Deactivate " + this.taskName);
    }

    protected String getStreamId(BatchStatus batchStatus) {
        if (batchStatus == BatchStatus.COMPUTING) {
            return "batch/compute-stream";
        }
        if (batchStatus == BatchStatus.PREPARE_COMMIT) {
            return "batch/parepare-stream";
        }
        if (batchStatus == BatchStatus.COMMIT) {
            return "batch/commit-stream";
        }
        if (batchStatus == BatchStatus.POST_COMMIT) {
            return "batch/post-stream";
        }
        if (batchStatus == BatchStatus.REVERT_COMMIT) {
            return "batch/revert-stream";
        }
        LOG.error("Occur unkonw type BatchStatus " + (Object)((Object)batchStatus));
        throw new RuntimeException();
    }

    protected boolean isCommitStatus(BatchStatus batchStatus) {
        if (batchStatus == BatchStatus.COMMIT) {
            return true;
        }
        return batchStatus == BatchStatus.REVERT_COMMIT;
    }

    protected boolean isCommitWait(BatchSpoutMsgId msgId) {
        if (!this.isCommitStatus(msgId.getBatchStatus())) {
            return false;
        }
        return currentBatchId.getId() < msgId.getBatchId().getId();
    }

    @Override
    public void nextTuple() {
        BatchSpoutMsgId msgId = null;
        try {
            msgId = this.batchQueue.poll(10L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            LOG.error("", (Throwable)e);
        }
        if (msgId == null) {
            return;
        }
        if (this.isCommitWait(msgId)) {
            this.batchQueue.offer(msgId);
            if (this.intervalCheck.check()) {
                LOG.info("Current msgId " + msgId + ", but current commit BatchId is " + currentBatchId);
            } else {
                LOG.debug("Current msgId " + msgId + ", but current commit BatchId is " + currentBatchId);
            }
            return;
        }
        String streamId = this.getStreamId(msgId.getBatchStatus());
        this.collector.emit(streamId, new Values(msgId.getBatchId()), msgId, new EmitCb(msgId));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void mkMsgId(BatchSpoutMsgId oldMsgId) {
        Class<BatchSpoutMsgId> clazz = BatchSpoutMsgId.class;
        synchronized (BatchSpoutMsgId.class) {
            if (currentBatchId.getId() <= oldMsgId.getBatchId().getId()) {
                byte[] data = String.valueOf(currentBatchId.getId()).getBytes();
                try {
                    this.zkClient.set_data(ZK_NODE_PATH, data);
                }
                catch (Exception e) {
                    LOG.error("Failed to update to ZK " + oldMsgId, (Throwable)e);
                }
                currentBatchId = BatchId.incBatchId(oldMsgId.getBatchId());
            }
            // ** MonitorExit[var2_2] (shouldn't be in output)
            BatchSpoutMsgId newMsgId = BatchSpoutMsgId.mkInstance();
            this.batchQueue.offer(newMsgId);
            StringBuilder sb = new StringBuilder();
            sb.append("Create new BatchId,");
            sb.append("old:").append(oldMsgId);
            sb.append("new:").append(newMsgId);
            sb.append("currentBatchId:").append(currentBatchId);
            LOG.info(sb.toString());
            return;
        }
    }

    protected void forward(BatchSpoutMsgId msgId) {
        BatchStatus status = msgId.getBatchStatus();
        BatchStatus newStatus = status.forward();
        if (newStatus == null) {
            this.mkMsgId(msgId);
            LOG.info("Finish old batch " + msgId);
        } else {
            msgId.setBatchStatus(newStatus);
            this.batchQueue.offer(msgId);
            LOG.info("Forward batch " + msgId);
        }
    }

    @Override
    public void ack(Object msgId) {
        if (msgId instanceof BatchSpoutMsgId) {
            this.forward((BatchSpoutMsgId)msgId);
            return;
        }
        LOG.warn("Unknown type msgId " + msgId.getClass().getName() + ":" + msgId);
    }

    protected void handleFail(BatchSpoutMsgId msgId) {
        LOG.info("Failed batch " + msgId);
        BatchStatus status = msgId.getBatchStatus();
        BatchStatus newStatus = status.error();
        if (newStatus == BatchStatus.ERROR) {
            this.mkMsgId(msgId);
        } else {
            msgId.setBatchStatus(newStatus);
            this.batchQueue.offer(msgId);
        }
    }

    @Override
    public void fail(Object msgId) {
        if (!(msgId instanceof BatchSpoutMsgId)) {
            LOG.warn("Unknown type msgId " + msgId.getClass().getName() + ":" + msgId);
            return;
        }
        this.handleFail((BatchSpoutMsgId)msgId);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream("batch/compute-stream", new Fields("BatchId"));
        declarer.declareStream("batch/parepare-stream", new Fields("BatchId"));
        declarer.declareStream("batch/commit-stream", new Fields("BatchId"));
        declarer.declareStream("batch/revert-stream", new Fields("BatchId"));
        declarer.declareStream("batch/post-stream", new Fields("BatchId"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        HashMap<String, Object> map = new HashMap<String, Object>();
        ConfigExtension.setSpoutSingleThread(map, true);
        return map;
    }

    class EmitCb
    implements ICollectorCallback {
        private BatchSpoutMsgId msgId;

        public EmitCb(BatchSpoutMsgId msgId) {
            this.msgId = msgId;
        }

        @Override
        public void execute(List<Integer> outTasks) {
            if (outTasks.isEmpty()) {
                BatchSpoutTrigger.this.forward(this.msgId);
            }
        }
    }
}

