package com.alibaba.jstorm.batch.impl;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.FailedException;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.ReportedFailedException;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.TimeCacheMap;
import com.alibaba.jstorm.batch.BatchId;
import com.alibaba.jstorm.batch.ICommitter;
import com.alibaba.jstorm.batch.IPostCommit;
import com.alibaba.jstorm.batch.IPrepareCommit;
import com.alibaba.jstorm.batch.util.BatchCommon;
import com.alibaba.jstorm.batch.util.BatchDef;
import com.alibaba.jstorm.batch.util.BatchStatus;
import com.alibaba.jstorm.cluster.ClusterState;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/batch/impl/CoordinatedBolt.class */
public class CoordinatedBolt implements IRichBolt {
    private static final long serialVersionUID = 5720810158625748046L;
    private IBasicBolt delegate;
    private BasicOutputCollector basicCollector;
    private OutputCollector collector;
    private String taskId;
    private String taskName;
    private boolean isCommiter = false;
    private String zkCommitPath;
    private TimeCacheMap<Object, Object> commited;
    public static Logger LOG = LoggerFactory.getLogger(CoordinatedBolt.class);
    private static ClusterState zkClient = null;

    public CoordinatedBolt(IBasicBolt iBasicBolt) {
        this.delegate = iBasicBolt;
    }

    public void mkCommitDir(Map map) {
        try {
            zkClient = BatchCommon.getZkClient(map);
            this.zkCommitPath = "/commit/" + this.taskId;
            if (zkClient.node_existed(this.zkCommitPath, false)) {
                zkClient.delete_node(this.zkCommitPath);
            }
            zkClient.mkdirs(this.zkCommitPath);
            LOG.info(this.taskName + " successfully create commit path" + this.zkCommitPath);
        } catch (Exception e) {
            LOG.error("Failed to create zk node", e);
            throw new RuntimeException();
        }
    }

    @Override // backtype.storm.task.IBolt
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.taskId = String.valueOf(topologyContext.getThisTaskId());
        this.taskName = topologyContext.getThisComponentId() + "_" + topologyContext.getThisTaskId();
        this.basicCollector = new BasicOutputCollector(outputCollector);
        this.collector = outputCollector;
        if (this.delegate instanceof ICommitter) {
            this.isCommiter = true;
            this.commited = new TimeCacheMap<>(topologyContext.maxTopologyMessageTimeout());
            mkCommitDir(map);
        }
        this.delegate.prepare(map, topologyContext);
    }

    public void removeUseless(String str, int i) throws Exception {
        List<String> list = zkClient.get_children(str, false);
        Collections.sort(list, new Comparator<String>() { // from class: com.alibaba.jstorm.batch.impl.CoordinatedBolt.1
            @Override // java.util.Comparator
            public int compare(String str2, String str3) {
                try {
                    return Long.valueOf(str2).compareTo(Long.valueOf(str3));
                } catch (Exception e) {
                    return str2.compareTo(str3);
                }
            }
        });
        for (int i2 = 0; i2 < list.size() - i; i2++) {
            zkClient.delete_node(str + "/" + list.get(i2));
        }
    }

    public String getCommitPath(BatchId batchId) {
        return this.zkCommitPath + "/" + batchId.getId();
    }

    public void updateToZk(Object obj, byte[] bArr) {
        try {
            removeUseless(this.zkCommitPath, 3);
            String commitPath = getCommitPath((BatchId) obj);
            byte[] bArr2 = bArr;
            if (bArr2 == null) {
                bArr2 = new byte[0];
            }
            zkClient.set_data(commitPath, bArr2);
            LOG.info("Update " + commitPath + " to zk");
        } catch (Exception e) {
            LOG.warn("Failed to update to zk,", e);
        }
    }

    public byte[] getCommittedData(Object obj) {
        try {
            return zkClient.get_data(getCommitPath((BatchId) obj), false);
        } catch (Exception e) {
            LOG.error("Failed to visit ZK,", e);
            return null;
        }
    }

    public void handleRegular(Tuple tuple) {
        this.basicCollector.setContext(tuple);
        try {
            this.delegate.execute(tuple, this.basicCollector);
            this.collector.ack(tuple);
        } catch (FailedException e) {
            if (e instanceof ReportedFailedException) {
                this.collector.reportError(e);
            }
            this.collector.fail(tuple);
        }
    }

    public void handlePrepareCommit(Tuple tuple) {
        this.basicCollector.setContext(tuple);
        try {
            ((IPrepareCommit) this.delegate).prepareCommit((BatchId) tuple.getValue(0), this.basicCollector);
            this.collector.ack(tuple);
        } catch (FailedException e) {
            if (e instanceof ReportedFailedException) {
                this.collector.reportError(e);
            }
            this.collector.fail(tuple);
        }
    }

    public void handleCommit(Tuple tuple) {
        Object value = tuple.getValue(0);
        try {
            byte[] commit = ((ICommitter) this.delegate).commit((BatchId) value);
            this.collector.ack(tuple);
            updateToZk(value, commit);
            this.commited.put(value, commit);
        } catch (Exception e) {
            LOG.error("Failed to commit ", e);
            this.collector.fail(tuple);
        }
    }

    public void handleRevert(Tuple tuple) {
        try {
            Object value = tuple.getValue(0);
            byte[] committedData = this.commited.containsKey(value) ? (byte[]) this.commited.get(value) : getCommittedData(value);
            if (committedData != null) {
                ((ICommitter) this.delegate).revert((BatchId) value, committedData);
            }
        } catch (Exception e) {
            LOG.error("Failed to revert,", e);
        }
        this.collector.ack(tuple);
    }

    public void handlePostCommit(Tuple tuple) {
        this.basicCollector.setContext(tuple);
        try {
            ((IPostCommit) this.delegate).postCommit((BatchId) tuple.getValue(0), this.basicCollector);
        } catch (Exception e) {
            LOG.info("Failed to do postCommit,", e);
        }
        this.collector.ack(tuple);
    }

    @Override // backtype.storm.task.IBolt
    public void execute(Tuple tuple) {
        BatchStatus batchStatus = getBatchStatus(tuple);
        if (batchStatus == BatchStatus.COMPUTING) {
            handleRegular(tuple);
            return;
        }
        if (batchStatus == BatchStatus.PREPARE_COMMIT) {
            handlePrepareCommit(tuple);
            return;
        }
        if (batchStatus == BatchStatus.COMMIT) {
            handleCommit(tuple);
        } else if (batchStatus == BatchStatus.REVERT_COMMIT) {
            handleRevert(tuple);
        } else {
            if (batchStatus != BatchStatus.POST_COMMIT) {
                throw new RuntimeException("Receive commit tuple, but not committer");
            }
            handlePostCommit(tuple);
        }
    }

    @Override // backtype.storm.task.IBolt
    public void cleanup() {
        this.delegate.cleanup();
    }

    @Override // backtype.storm.topology.IComponent
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        this.delegate.declareOutputFields(outputFieldsDeclarer);
    }

    @Override // backtype.storm.topology.IComponent
    public Map<String, Object> getComponentConfiguration() {
        return this.delegate.getComponentConfiguration();
    }

    private BatchStatus getBatchStatus(Tuple tuple) {
        String sourceStreamId = tuple.getSourceStreamId();
        return sourceStreamId.equals(BatchDef.PREPARE_STREAM_ID) ? BatchStatus.PREPARE_COMMIT : sourceStreamId.equals(BatchDef.COMMIT_STREAM_ID) ? BatchStatus.COMMIT : sourceStreamId.equals(BatchDef.REVERT_STREAM_ID) ? BatchStatus.REVERT_COMMIT : sourceStreamId.equals(BatchDef.POST_STREAM_ID) ? BatchStatus.POST_COMMIT : BatchStatus.COMPUTING;
    }
}
