package com.alibaba.jstorm.task.backpressure;

import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.DisruptorQueue;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.execute.BoltCollector;
import com.alibaba.jstorm.task.execute.BoltExecutors;
import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent;
import com.alibaba.jstorm.task.master.TopologyMaster;
import com.alibaba.jstorm.utils.IntervalCheck;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/task/backpressure/BackpressureTrigger.class */
public class BackpressureTrigger extends Backpressure {
    private static final Logger LOG = LoggerFactory.getLogger(BackpressureTrigger.class);
    private Task task;
    private int taskId;
    private DisruptorQueue exeQueue;
    private DisruptorQueue recvQueue;
    private BoltExecutors boltExecutor;
    private volatile boolean isUnderBackpressure;
    private IntervalCheck intervalCheck;
    BoltCollector output;
    private List<TopoMasterCtrlEvent.EventType> samplingSet;
    private double triggerSampleRate;

    public BackpressureTrigger(Task task, BoltExecutors boltExecutors, Map map, BoltCollector boltCollector) {
        super(map);
        TargetBackpressureInfo targetBackpressureInfo;
        this.isUnderBackpressure = false;
        this.task = task;
        this.taskId = task.getTaskId().intValue();
        int backpressureTriggerSampleNumber = ConfigExtension.getBackpressureTriggerSampleNumber(map) * ConfigExtension.getBackpressureCheckIntervl(map);
        this.intervalCheck = new IntervalCheck();
        this.intervalCheck.setIntervalMs(backpressureTriggerSampleNumber);
        this.intervalCheck.start();
        this.samplingSet = new ArrayList();
        this.triggerSampleRate = ConfigExtension.getBackpressureTriggerSampleRate(map);
        this.output = boltCollector;
        this.boltExecutor = boltExecutors;
        try {
            Map<String, SourceBackpressureInfo> map2 = task.getZkCluster().get_backpressure_info(task.getTopologyId());
            if (map2 != null) {
                Iterator<Map.Entry<String, SourceBackpressureInfo>> it = map2.entrySet().iterator();
                while (it.hasNext()) {
                    Map<String, TargetBackpressureInfo> targetTasks = it.next().getValue().getTargetTasks();
                    if (targetTasks != null && (targetBackpressureInfo = targetTasks.get(task.getComponentId())) != null && targetBackpressureInfo.getTasks().contains(Integer.valueOf(this.taskId))) {
                        this.isBackpressureEnable = true;
                        LOG.info("Retrieved backpressure info for task-" + this.taskId);
                    }
                }
            }
        } catch (Exception e) {
            LOG.info("Failed to get backpressure info from zk", e);
        }
        LOG.info("Finished BackpressureTrigger init, highWaterMark=" + this.highWaterMark + ", lowWaterMark=" + this.lowWaterMark + ", sendInterval=" + this.intervalCheck.getInterval());
    }

    public void checkAndTrigger() {
        if (this.isBackpressureEnable) {
            if (this.exeQueue == null || this.recvQueue == null) {
                this.exeQueue = this.task.getExecuteQueue();
                this.recvQueue = this.task.getDeserializeQueue();
                if (this.exeQueue == null) {
                    LOG.info("Init of excutor-task-" + this.taskId + " has not been finished!");
                    return;
                } else if (this.recvQueue == null) {
                    LOG.info("Init of receiver-task-" + this.taskId + " has not been finished!");
                    return;
                }
            }
            LOG.debug("Backpressure Check: exeQueue load=" + (this.exeQueue.pctFull() * 100.0f) + ", recvQueue load=" + (this.recvQueue.pctFull() * 100.0f));
            if (this.exeQueue.pctFull() > ((float) this.highWaterMark)) {
                this.samplingSet.add(TopoMasterCtrlEvent.EventType.startBackpressure);
            } else if (this.exeQueue.pctFull() <= this.lowWaterMark) {
                this.samplingSet.add(TopoMasterCtrlEvent.EventType.stopBackpressure);
            } else {
                this.samplingSet.add(TopoMasterCtrlEvent.EventType.defaultType);
            }
            if (this.intervalCheck.check()) {
                int i = 0;
                int i2 = 0;
                for (TopoMasterCtrlEvent.EventType eventType : this.samplingSet) {
                    if (eventType.equals(TopoMasterCtrlEvent.EventType.startBackpressure)) {
                        i++;
                    } else if (eventType.equals(TopoMasterCtrlEvent.EventType.stopBackpressure)) {
                        i2++;
                    }
                }
                if (i > i2) {
                    if (sampleRateCheck(i)) {
                        startBackpressure();
                        this.isUnderBackpressure = true;
                    }
                } else if (sampleRateCheck(i2) && this.isUnderBackpressure) {
                    stopBackpressure();
                }
                this.samplingSet.clear();
            }
        }
    }

    private boolean sampleRateCheck(double d) {
        return d / ((double) this.samplingSet.size()) >= this.triggerSampleRate;
    }

    public void handle(Tuple tuple) {
        try {
            TopoMasterCtrlEvent topoMasterCtrlEvent = (TopoMasterCtrlEvent) tuple.getValueByField(TopologyMaster.FILED_CTRL_EVENT);
            TopoMasterCtrlEvent.EventType eventType = topoMasterCtrlEvent.getEventType();
            if (eventType.equals(TopoMasterCtrlEvent.EventType.stopBackpressure)) {
                this.isUnderBackpressure = false;
                LOG.debug("Received stop backpressure event for task-" + this.task.getTaskId());
            } else if (eventType.equals(TopoMasterCtrlEvent.EventType.updateBackpressureConfig)) {
                updateConfig((Map) topoMasterCtrlEvent.getEventValue().get(0));
                if (this.isBackpressureEnable) {
                    LOG.info("Enable backpressure in trigger.");
                } else {
                    LOG.info("Disable backpressure in trigger.");
                    this.isUnderBackpressure = false;
                    this.samplingSet.clear();
                }
            } else {
                LOG.info("Received unexpected event, " + eventType.toString());
            }
        } catch (Exception e) {
            LOG.error("Failed to handle event", e);
        }
    }

    private void startBackpressure() {
        ArrayList arrayList = new ArrayList();
        Double valueOf = Double.valueOf(this.boltExecutor.getExecuteTime());
        arrayList.add(Integer.valueOf(valueOf.intValue()));
        this.output.emitCtrl(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID, null, new Values(new TopoMasterCtrlEvent(TopoMasterCtrlEvent.EventType.startBackpressure, arrayList)));
        LOG.debug("Send start backpressure request for task-{}, flowCtrlTime={}", Integer.valueOf(this.taskId), Integer.valueOf(valueOf.intValue()));
    }

    private void stopBackpressure() {
        this.output.emitCtrl(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID, null, new Values(new TopoMasterCtrlEvent(TopoMasterCtrlEvent.EventType.stopBackpressure, null)));
        LOG.debug("Send stop backpressure request for task-{}", Integer.valueOf(this.taskId));
    }
}
