/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.task.backpressure;

import backtype.storm.generated.Bolt;
import backtype.storm.generated.ComponentCommon;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.generated.SpoutSpec;
import backtype.storm.generated.StormTopology;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.task.backpressure.Backpressure;
import com.alibaba.jstorm.task.backpressure.SourceBackpressureInfo;
import com.alibaba.jstorm.task.backpressure.TargetBackpressureInfo;
import com.alibaba.jstorm.task.execute.BoltCollector;
import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BackpressureCoordinator
extends Backpressure {
    private static final Logger LOG = LoggerFactory.getLogger(BackpressureCoordinator.class);
    private static final int adjustedTime = 5;
    private TopologyContext context;
    private StormTopology topology;
    private OutputCollector output;
    private int topologyMasterId;
    private Map<Integer, String> taskIdToComponentId;
    private Map<String, SpoutSpec> spouts;
    private Map<String, Bolt> bolts;
    private Map<String, SourceBackpressureInfo> SourceTobackpressureInfo;
    private Integer period;
    private StormClusterState zkCluster;
    private static final String BACKPRESSURE_TAG = "Backpressure has been ";

    public BackpressureCoordinator(OutputCollector output, TopologyContext topologyContext, Integer taskId) {
        super(topologyContext.getStormConf());
        this.context = topologyContext;
        this.topology = topologyContext.getRawTopology();
        this.spouts = new HashMap<String, SpoutSpec>();
        if (this.topology.get_spouts() != null) {
            this.spouts.putAll(this.topology.get_spouts());
        }
        this.bolts = new HashMap<String, Bolt>();
        if (this.topology.get_bolts() != null) {
            this.bolts.putAll(this.topology.get_bolts());
        }
        this.taskIdToComponentId = topologyContext.getTaskToComponent();
        this.topologyMasterId = taskId;
        this.output = output;
        int checkInterval = ConfigExtension.getBackpressureCheckIntervl(this.context.getStormConf());
        int sampleNum = ConfigExtension.getBackpressureTriggerSampleNumber(this.context.getStormConf());
        this.period = checkInterval * sampleNum;
        this.zkCluster = topologyContext.getZkCluster();
        try {
            this.SourceTobackpressureInfo = this.zkCluster.get_backpressure_info(this.context.getTopologyId());
            if (this.SourceTobackpressureInfo == null) {
                this.SourceTobackpressureInfo = new HashMap<String, SourceBackpressureInfo>();
            } else {
                LOG.info("Successfully retrieve existing SourceTobackpressureInfo from zk: " + this.SourceTobackpressureInfo);
            }
        }
        catch (Exception e) {
            LOG.warn("Failed to get SourceTobackpressureInfo from zk", (Throwable)e);
            this.SourceTobackpressureInfo = new HashMap<String, SourceBackpressureInfo>();
        }
    }

    private Set<String> getInputSpoutsForBolt(StormTopology topology, String boltComponentId, Set<String> componentsTraversed) {
        Bolt bolt;
        HashSet<String> ret = new HashSet<String>();
        if (componentsTraversed == null) {
            componentsTraversed = new HashSet<String>();
        }
        if ((bolt = this.bolts.get(boltComponentId)) == null) {
            return ret;
        }
        ComponentCommon common = bolt.get_common();
        Set<GlobalStreamId> inputstreams = common.get_inputs().keySet();
        HashSet<String> inputComponents = new HashSet<String>();
        for (GlobalStreamId streamId : inputstreams) {
            inputComponents.add(streamId.get_componentId());
        }
        HashSet<String> spoutComponentIds = new HashSet<String>(this.spouts.keySet());
        HashSet<String> boltComponentIds = new HashSet<String>(this.bolts.keySet());
        for (String inputComponent : inputComponents) {
            if (componentsTraversed.contains(inputComponent)) continue;
            componentsTraversed.add(inputComponent);
            if (spoutComponentIds.contains(inputComponent)) {
                ret.add(inputComponent);
                continue;
            }
            if (!boltComponentIds.contains(inputComponent)) continue;
            Set<String> inputs = this.getInputSpoutsForBolt(topology, inputComponent, componentsTraversed);
            ret.addAll(inputs);
        }
        return ret;
    }

    public void process(Tuple input) {
        if (!this.isBackpressureEnable) {
            return;
        }
        int sourceTask = input.getSourceTask();
        String componentId = this.taskIdToComponentId.get(sourceTask);
        if (componentId == null) {
            LOG.warn("Receive tuple from unknown task-" + sourceTask);
            return;
        }
        if (this.spouts.keySet().contains(componentId)) {
            if (this.SourceTobackpressureInfo.get(componentId) != null) {
                this.handleEventFromSpout(sourceTask, input);
            }
        } else if (this.bolts.keySet().contains(componentId)) {
            this.handleEventFromBolt(sourceTask, input);
        }
    }

    public void updateBackpressureConfig(Map conf) {
        this.updateConfig(conf);
        if (!this.isBackpressureEnable) {
            LOG.info("Disable backpressure in coordinator.");
            this.SourceTobackpressureInfo.clear();
        } else {
            LOG.info("Enable backpressure in coordinator.");
        }
        TopoMasterCtrlEvent updateBpConfig = new TopoMasterCtrlEvent(TopoMasterCtrlEvent.EventType.updateBackpressureConfig, new ArrayList<Object>());
        updateBpConfig.addEventValue(conf);
        Values values = new Values(updateBpConfig);
        HashSet<Integer> targetTasks = new HashSet<Integer>(this.taskIdToComponentId.keySet());
        targetTasks.remove(this.topologyMasterId);
        targetTasks.removeAll(this.context.getComponentTasks("__acker"));
        this.sendBackpressureMessage(targetTasks, values, TopoMasterCtrlEvent.EventType.updateBackpressureConfig);
        this.reportBackpressureStatus();
    }

    private boolean checkSpoutsUnderBackpressure(Set<String> spouts) {
        boolean ret = false;
        if (spouts != null) {
            for (String spout : spouts) {
                SourceBackpressureInfo backpressureInfo = this.SourceTobackpressureInfo.get(spout);
                if (backpressureInfo == null || backpressureInfo.getTasks().size() <= 0) continue;
                ret = true;
                break;
            }
        }
        return ret;
    }

    private TargetBackpressureInfo getBackpressureInfoBySourceSpout(String sourceSpout, String targetComponentId, boolean created) {
        TargetBackpressureInfo ret = null;
        SourceBackpressureInfo info = this.SourceTobackpressureInfo.get(sourceSpout);
        if (info == null) {
            if (created) {
                info = new SourceBackpressureInfo();
                this.SourceTobackpressureInfo.put(sourceSpout, info);
            }
        } else {
            ret = info.getTargetTasks().get(targetComponentId);
        }
        if (ret == null && created) {
            ret = new TargetBackpressureInfo();
            info.getTargetTasks().put(targetComponentId, ret);
        }
        return ret;
    }

    private boolean checkIntervalExpired(long time) {
        boolean ret = false;
        if (time != 0L && System.currentTimeMillis() - time > (long)this.period.intValue()) {
            ret = true;
        }
        return ret;
    }

    private void sendBackpressureMessage(Set<Integer> targetTasks, Values value, TopoMasterCtrlEvent.EventType backpressureType) {
        for (Integer taskId : targetTasks) {
            ((BoltCollector)this.output.getDelegate()).emitDirectCtrl(taskId, "__master_control_stream", null, value);
            LOG.debug("Send " + backpressureType.toString() + " request to taskId-" + taskId);
        }
    }

    private void handleEventFromSpout(int sourceTask, Tuple input) {
        TopoMasterCtrlEvent ctrlEvent = (TopoMasterCtrlEvent)input.getValueByField("ctrlEvent");
        TopoMasterCtrlEvent.EventType type = ctrlEvent.getEventType();
        boolean update = false;
        if (type.equals((Object)TopoMasterCtrlEvent.EventType.stopBackpressure)) {
            String spoutComponentId = this.taskIdToComponentId.get(sourceTask);
            SourceBackpressureInfo info = this.SourceTobackpressureInfo.get(spoutComponentId);
            if (info != null) {
                info.getTasks().remove(sourceTask);
                if (info.getTasks().isEmpty()) {
                    this.SourceTobackpressureInfo.remove(spoutComponentId);
                }
                if (info.getTasks().isEmpty()) {
                    for (Map.Entry<String, TargetBackpressureInfo> entry : info.getTargetTasks().entrySet()) {
                        String componentId = entry.getKey();
                        Set<String> sourceSpouts = this.getInputSpoutsForBolt(this.topology, componentId, null);
                        if (this.checkSpoutsUnderBackpressure(sourceSpouts)) continue;
                        HashSet<Integer> tasks = new HashSet<Integer>();
                        tasks.addAll(this.context.getComponentTasks(componentId));
                        this.sendBackpressureMessage(tasks, new Values(ctrlEvent), type);
                    }
                }
                update = true;
            } else {
                LOG.error("Received event from non-recorded spout-" + sourceTask);
            }
        } else {
            LOG.warn("Received unexpected event, " + type.toString());
        }
        if (update) {
            this.reportBackpressureStatus();
        }
    }

    private void handleEventFromBolt(int sourceTask, Tuple input) {
        String componentId = this.taskIdToComponentId.get(sourceTask);
        Set<String> inputSpouts = this.getInputSpoutsForBolt(this.topology, componentId, null);
        TopoMasterCtrlEvent ctrlEvent = (TopoMasterCtrlEvent)input.getValueByField("ctrlEvent");
        TopoMasterCtrlEvent.EventType type = ctrlEvent.getEventType();
        HashSet<Integer> notifyList = new HashSet<Integer>();
        Values values = null;
        TargetBackpressureInfo info = null;
        boolean update = false;
        if (type.equals((Object)TopoMasterCtrlEvent.EventType.startBackpressure)) {
            int flowCtrlTime = (Integer)ctrlEvent.getEventValue().get(0);
            for (String spout : inputSpouts) {
                info = this.getBackpressureInfoBySourceSpout(spout, componentId, true);
                SourceBackpressureInfo sourceInfo = this.SourceTobackpressureInfo.get(spout);
                update = info.getTasks().add(sourceTask);
                boolean add = false;
                if (System.currentTimeMillis() - sourceInfo.getLastestTimeStamp() > (long)this.period.intValue()) {
                    add = true;
                } else {
                    int maxFlowCtrlTime;
                    TopoMasterCtrlEvent.EventType lastestBpEvent = sourceInfo.getLastestBackpressureEvent();
                    if (lastestBpEvent != null && !lastestBpEvent.equals((Object)TopoMasterCtrlEvent.EventType.startBackpressure)) {
                        add = true;
                    }
                    if ((flowCtrlTime - (maxFlowCtrlTime = sourceInfo.getMaxFlowCtrlTime()) > 5 || maxFlowCtrlTime == -1) && flowCtrlTime >= 0) {
                        add = true;
                    }
                }
                info.setFlowCtrlTime(flowCtrlTime);
                info.setBackpressureStatus(type);
                if (add) {
                    info.setTimeStamp(System.currentTimeMillis());
                    double taskBpRatio = Double.valueOf(info.getTasks().size()) / Double.valueOf(this.context.getComponentTasks(componentId).size());
                    if (taskBpRatio >= this.triggerBpRatio) {
                        HashSet<Integer> spoutTasks = new HashSet<Integer>(this.context.getComponentTasks(spout));
                        if (spoutTasks == null) continue;
                        this.SourceTobackpressureInfo.get(spout).getTasks().addAll(spoutTasks);
                        notifyList.addAll(spoutTasks);
                        continue;
                    }
                    update = false;
                    continue;
                }
                update = false;
            }
            ArrayList<Object> value = new ArrayList<Object>();
            value.add(info.getFlowCtrlTime());
            TopoMasterCtrlEvent startBp = new TopoMasterCtrlEvent(TopoMasterCtrlEvent.EventType.startBackpressure, value);
            values = new Values(startBp);
        } else if (type.equals((Object)TopoMasterCtrlEvent.EventType.stopBackpressure)) {
            for (String spout : inputSpouts) {
                HashSet<Integer> spoutTasks;
                Set<Integer> tasks;
                info = this.getBackpressureInfoBySourceSpout(spout, componentId, false);
                SourceBackpressureInfo sourceInfo = this.SourceTobackpressureInfo.get(spout);
                if (info != null && (tasks = info.getTasks()) != null && tasks.remove(sourceTask)) {
                    update = true;
                }
                if (sourceInfo == null || !this.checkIntervalExpired(sourceInfo.getLastestTimeStamp())) continue;
                if (info != null) {
                    info.setTimeStamp(System.currentTimeMillis());
                }
                if ((spoutTasks = new HashSet<Integer>(this.context.getComponentTasks(spout))) != null) {
                    notifyList.addAll(spoutTasks);
                }
                info.setBackpressureStatus(type);
            }
            if (!this.checkSpoutsUnderBackpressure(inputSpouts)) {
                notifyList.add(sourceTask);
            }
            TopoMasterCtrlEvent stoptBp = new TopoMasterCtrlEvent(TopoMasterCtrlEvent.EventType.stopBackpressure, null);
            values = new Values(stoptBp);
        } else {
            LOG.warn("Received unknown event " + type.toString());
        }
        this.sendBackpressureMessage(notifyList, values, type);
        if (update) {
            LOG.info("inputspouts=" + inputSpouts + " for " + componentId + "-" + sourceTask + ", eventType=" + type.toString());
            this.reportBackpressureStatus();
        }
    }

    private Set<Integer> getTasksUnderBackpressure() {
        HashSet<Integer> ret = new HashSet<Integer>();
        for (Map.Entry<String, SourceBackpressureInfo> entry : this.SourceTobackpressureInfo.entrySet()) {
            SourceBackpressureInfo sourceInfo = entry.getValue();
            if (sourceInfo.getTasks().size() <= 0) continue;
            ret.addAll(sourceInfo.getTasks());
            for (Map.Entry<String, TargetBackpressureInfo> targetEntry : sourceInfo.getTargetTasks().entrySet()) {
                ret.addAll(targetEntry.getValue().getTasks());
            }
        }
        return ret;
    }

    private void reportBackpressureStatus() {
        try {
            StringBuilder stringBuilder = new StringBuilder();
            Set<Integer> underTasks = this.getTasksUnderBackpressure();
            stringBuilder.append(BACKPRESSURE_TAG);
            if (underTasks.isEmpty()) {
                stringBuilder.append("closed ");
            } else {
                stringBuilder.append("opened: ");
                stringBuilder.append(underTasks);
            }
            this.zkCluster.report_task_error(this.context.getTopologyId(), this.context.getThisTaskId(), stringBuilder.toString(), "warn", 110, 1800, BACKPRESSURE_TAG);
            this.zkCluster.set_backpressure_info(this.context.getTopologyId(), this.SourceTobackpressureInfo);
            LOG.info(stringBuilder.toString());
        }
        catch (Exception e) {
            LOG.error("can't update backpressure state ", (Throwable)e);
        }
    }
}

