/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.batch.impl;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.FailedException;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.ReportedFailedException;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.TimeCacheMap;
import com.alibaba.jstorm.batch.BatchId;
import com.alibaba.jstorm.batch.ICommitter;
import com.alibaba.jstorm.batch.IPostCommit;
import com.alibaba.jstorm.batch.IPrepareCommit;
import com.alibaba.jstorm.batch.util.BatchCommon;
import com.alibaba.jstorm.batch.util.BatchStatus;
import com.alibaba.jstorm.cluster.ClusterState;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CoordinatedBolt
implements IRichBolt {
    private static final long serialVersionUID = 5720810158625748046L;
    public static Logger LOG = LoggerFactory.getLogger(CoordinatedBolt.class);
    private IBasicBolt delegate;
    private BasicOutputCollector basicCollector;
    private OutputCollector collector;
    private String taskId;
    private String taskName;
    private boolean isCommiter = false;
    private String zkCommitPath;
    private TimeCacheMap<Object, Object> commited;
    private static ClusterState zkClient = null;

    public CoordinatedBolt(IBasicBolt delegate) {
        this.delegate = delegate;
    }

    public void mkCommitDir(Map conf) {
        try {
            zkClient = BatchCommon.getZkClient(conf);
            this.zkCommitPath = "/commit/" + this.taskId;
            if (zkClient.node_existed(this.zkCommitPath, false)) {
                zkClient.delete_node(this.zkCommitPath);
            }
            zkClient.mkdirs(this.zkCommitPath);
            LOG.info(this.taskName + " successfully create commit path" + this.zkCommitPath);
        }
        catch (Exception e) {
            LOG.error("Failed to create zk node", (Throwable)e);
            throw new RuntimeException();
        }
    }

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        this.taskId = String.valueOf(context.getThisTaskId());
        this.taskName = context.getThisComponentId() + "_" + context.getThisTaskId();
        this.basicCollector = new BasicOutputCollector(collector);
        this.collector = collector;
        if (this.delegate instanceof ICommitter) {
            this.isCommiter = true;
            this.commited = new TimeCacheMap(context.maxTopologyMessageTimeout());
            this.mkCommitDir(conf);
        }
        this.delegate.prepare(conf, context);
    }

    public void removeUseless(String path, int reserveSize) throws Exception {
        List<String> childs = zkClient.get_children(path, false);
        Collections.sort(childs, new Comparator<String>(){

            @Override
            public int compare(String o1, String o2) {
                try {
                    Long v1 = Long.valueOf(o1);
                    Long v2 = Long.valueOf(o2);
                    return v1.compareTo(v2);
                }
                catch (Exception e) {
                    return o1.compareTo(o2);
                }
            }
        });
        for (int index = 0; index < childs.size() - reserveSize; ++index) {
            zkClient.delete_node(path + "/" + childs.get(index));
        }
    }

    public String getCommitPath(BatchId id) {
        return this.zkCommitPath + "/" + id.getId();
    }

    public void updateToZk(Object id, byte[] commitResult) {
        try {
            this.removeUseless(this.zkCommitPath, 3);
            String path = this.getCommitPath((BatchId)id);
            byte[] data = commitResult;
            if (data == null) {
                data = new byte[]{};
            }
            zkClient.set_data(path, data);
            LOG.info("Update " + path + " to zk");
        }
        catch (Exception e) {
            LOG.warn("Failed to update to zk,", (Throwable)e);
        }
    }

    public byte[] getCommittedData(Object id) {
        try {
            String path = this.getCommitPath((BatchId)id);
            byte[] data = zkClient.get_data(path, false);
            return data;
        }
        catch (Exception e) {
            LOG.error("Failed to visit ZK,", (Throwable)e);
            return null;
        }
    }

    public void handleRegular(Tuple tuple) {
        this.basicCollector.setContext(tuple);
        try {
            this.delegate.execute(tuple, this.basicCollector);
            this.collector.ack(tuple);
        }
        catch (FailedException e) {
            if (e instanceof ReportedFailedException) {
                this.collector.reportError(e);
            }
            this.collector.fail(tuple);
        }
    }

    public void handlePrepareCommit(Tuple tuple) {
        this.basicCollector.setContext(tuple);
        try {
            BatchId id = (BatchId)tuple.getValue(0);
            ((IPrepareCommit)((Object)this.delegate)).prepareCommit(id, this.basicCollector);
            this.collector.ack(tuple);
        }
        catch (FailedException e) {
            if (e instanceof ReportedFailedException) {
                this.collector.reportError(e);
            }
            this.collector.fail(tuple);
        }
    }

    public void handleCommit(Tuple tuple) {
        Object id = tuple.getValue(0);
        try {
            byte[] commitResult = ((ICommitter)((Object)this.delegate)).commit((BatchId)id);
            this.collector.ack(tuple);
            this.updateToZk(id, commitResult);
            this.commited.put(id, commitResult);
        }
        catch (Exception e) {
            LOG.error("Failed to commit ", (Throwable)e);
            this.collector.fail(tuple);
        }
    }

    public void handleRevert(Tuple tuple) {
        try {
            Object id = tuple.getValue(0);
            byte[] commitResult = null;
            commitResult = this.commited.containsKey(id) ? (byte[])this.commited.get(id) : this.getCommittedData(id);
            if (commitResult != null) {
                ((ICommitter)((Object)this.delegate)).revert((BatchId)id, commitResult);
            }
        }
        catch (Exception e) {
            LOG.error("Failed to revert,", (Throwable)e);
        }
        this.collector.ack(tuple);
    }

    public void handlePostCommit(Tuple tuple) {
        this.basicCollector.setContext(tuple);
        try {
            BatchId id = (BatchId)tuple.getValue(0);
            ((IPostCommit)((Object)this.delegate)).postCommit(id, this.basicCollector);
        }
        catch (Exception e) {
            LOG.info("Failed to do postCommit,", (Throwable)e);
        }
        this.collector.ack(tuple);
    }

    @Override
    public void execute(Tuple tuple) {
        BatchStatus batchStatus = this.getBatchStatus(tuple);
        if (batchStatus == BatchStatus.COMPUTING) {
            this.handleRegular(tuple);
        } else if (batchStatus == BatchStatus.PREPARE_COMMIT) {
            this.handlePrepareCommit(tuple);
        } else if (batchStatus == BatchStatus.COMMIT) {
            this.handleCommit(tuple);
        } else if (batchStatus == BatchStatus.REVERT_COMMIT) {
            this.handleRevert(tuple);
        } else if (batchStatus == BatchStatus.POST_COMMIT) {
            this.handlePostCommit(tuple);
        } else {
            throw new RuntimeException("Receive commit tuple, but not committer");
        }
    }

    @Override
    public void cleanup() {
        this.delegate.cleanup();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        this.delegate.declareOutputFields(declarer);
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return this.delegate.getComponentConfiguration();
    }

    private BatchStatus getBatchStatus(Tuple tuple) {
        String streamId = tuple.getSourceStreamId();
        if (streamId.equals("batch/parepare-stream")) {
            return BatchStatus.PREPARE_COMMIT;
        }
        if (streamId.equals("batch/commit-stream")) {
            return BatchStatus.COMMIT;
        }
        if (streamId.equals("batch/revert-stream")) {
            return BatchStatus.REVERT_COMMIT;
        }
        if (streamId.equals("batch/post-stream")) {
            return BatchStatus.POST_COMMIT;
        }
        return BatchStatus.COMPUTING;
    }
}

