/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.task.backpressure;

import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.DisruptorQueue;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.backpressure.Backpressure;
import com.alibaba.jstorm.task.backpressure.SourceBackpressureInfo;
import com.alibaba.jstorm.task.backpressure.TargetBackpressureInfo;
import com.alibaba.jstorm.task.execute.BoltCollector;
import com.alibaba.jstorm.task.execute.BoltExecutors;
import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent;
import com.alibaba.jstorm.utils.IntervalCheck;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BackpressureTrigger
extends Backpressure {
    private static final Logger LOG = LoggerFactory.getLogger(BackpressureTrigger.class);
    private Task task;
    private int taskId;
    private DisruptorQueue exeQueue;
    private DisruptorQueue recvQueue;
    private BoltExecutors boltExecutor;
    private volatile boolean isUnderBackpressure = false;
    private IntervalCheck intervalCheck;
    BoltCollector output;
    private List<TopoMasterCtrlEvent.EventType> samplingSet;
    private double triggerSampleRate;

    public BackpressureTrigger(Task task, BoltExecutors boltExecutor, Map stormConf, BoltCollector output) {
        super(stormConf);
        this.task = task;
        this.taskId = task.getTaskId();
        int sampleNum = ConfigExtension.getBackpressureTriggerSampleNumber(stormConf);
        int smapleInterval = sampleNum * ConfigExtension.getBackpressureCheckIntervl(stormConf);
        this.intervalCheck = new IntervalCheck();
        this.intervalCheck.setIntervalMs(smapleInterval);
        this.intervalCheck.start();
        this.samplingSet = new ArrayList<TopoMasterCtrlEvent.EventType>();
        this.triggerSampleRate = ConfigExtension.getBackpressureTriggerSampleRate(stormConf);
        this.output = output;
        this.boltExecutor = boltExecutor;
        try {
            StormClusterState zkCluster = task.getZkCluster();
            Map<String, SourceBackpressureInfo> backpressureInfo = zkCluster.get_backpressure_info(task.getTopologyId());
            if (backpressureInfo != null) {
                for (Map.Entry<String, SourceBackpressureInfo> entry : backpressureInfo.entrySet()) {
                    TargetBackpressureInfo targetInfo;
                    SourceBackpressureInfo info = entry.getValue();
                    Map<String, TargetBackpressureInfo> targetInfoMap = info.getTargetTasks();
                    if (targetInfoMap == null || (targetInfo = targetInfoMap.get(task.getComponentId())) == null || !targetInfo.getTasks().contains(this.taskId)) continue;
                    this.isBackpressureEnable = true;
                    LOG.info("Retrieved backpressure info for task-" + this.taskId);
                }
            }
        }
        catch (Exception e) {
            LOG.info("Failed to get backpressure info from zk", (Throwable)e);
        }
        LOG.info("Finished BackpressureTrigger init, highWaterMark=" + this.highWaterMark + ", lowWaterMark=" + this.lowWaterMark + ", sendInterval=" + this.intervalCheck.getInterval());
    }

    public void checkAndTrigger() {
        if (!this.isBackpressureEnable) {
            return;
        }
        if (this.exeQueue == null || this.recvQueue == null) {
            this.exeQueue = this.task.getExecuteQueue();
            this.recvQueue = this.task.getDeserializeQueue();
            if (this.exeQueue == null) {
                LOG.info("Init of excutor-task-" + this.taskId + " has not been finished!");
                return;
            }
            if (this.recvQueue == null) {
                LOG.info("Init of receiver-task-" + this.taskId + " has not been finished!");
                return;
            }
        }
        LOG.debug("Backpressure Check: exeQueue load=" + this.exeQueue.pctFull() * 100.0f + ", recvQueue load=" + this.recvQueue.pctFull() * 100.0f);
        if (this.exeQueue.pctFull() > (float)this.highWaterMark) {
            this.samplingSet.add(TopoMasterCtrlEvent.EventType.startBackpressure);
        } else if ((double)this.exeQueue.pctFull() <= this.lowWaterMark) {
            this.samplingSet.add(TopoMasterCtrlEvent.EventType.stopBackpressure);
        } else {
            this.samplingSet.add(TopoMasterCtrlEvent.EventType.defaultType);
        }
        if (this.intervalCheck.check()) {
            int startCount = 0;
            int stopCount = 0;
            for (TopoMasterCtrlEvent.EventType eventType : this.samplingSet) {
                if (eventType.equals((Object)TopoMasterCtrlEvent.EventType.startBackpressure)) {
                    ++startCount;
                    continue;
                }
                if (!eventType.equals((Object)TopoMasterCtrlEvent.EventType.stopBackpressure)) continue;
                ++stopCount;
            }
            if (startCount > stopCount) {
                if (this.sampleRateCheck(startCount)) {
                    this.startBackpressure();
                    this.isUnderBackpressure = true;
                }
            } else if (this.sampleRateCheck(stopCount) && this.isUnderBackpressure) {
                this.stopBackpressure();
            }
            this.samplingSet.clear();
        }
    }

    private boolean sampleRateCheck(double count) {
        double sampleRate = count / (double)this.samplingSet.size();
        return sampleRate >= this.triggerSampleRate;
    }

    public void handle(Tuple input) {
        try {
            TopoMasterCtrlEvent event = (TopoMasterCtrlEvent)input.getValueByField("ctrlEvent");
            TopoMasterCtrlEvent.EventType type = event.getEventType();
            if (type.equals((Object)TopoMasterCtrlEvent.EventType.stopBackpressure)) {
                this.isUnderBackpressure = false;
                LOG.debug("Received stop backpressure event for task-" + this.task.getTaskId());
            } else if (type.equals((Object)TopoMasterCtrlEvent.EventType.updateBackpressureConfig)) {
                Map stormConf = (Map)event.getEventValue().get(0);
                this.updateConfig(stormConf);
                if (!this.isBackpressureEnable) {
                    LOG.info("Disable backpressure in trigger.");
                    this.isUnderBackpressure = false;
                    this.samplingSet.clear();
                } else {
                    LOG.info("Enable backpressure in trigger.");
                }
            } else {
                LOG.info("Received unexpected event, " + type.toString());
            }
        }
        catch (Exception e) {
            LOG.error("Failed to handle event", (Throwable)e);
        }
    }

    private void startBackpressure() {
        ArrayList<Object> value = new ArrayList<Object>();
        Double flowCtrlTime = this.boltExecutor.getExecuteTime();
        value.add(flowCtrlTime.intValue());
        TopoMasterCtrlEvent startBp = new TopoMasterCtrlEvent(TopoMasterCtrlEvent.EventType.startBackpressure, value);
        this.output.emitCtrl("__master_control_stream", null, new Values(startBp));
        LOG.debug("Send start backpressure request for task-{}, flowCtrlTime={}", (Object)this.taskId, (Object)flowCtrlTime.intValue());
    }

    private void stopBackpressure() {
        TopoMasterCtrlEvent stopBp = new TopoMasterCtrlEvent(TopoMasterCtrlEvent.EventType.stopBackpressure, null);
        this.output.emitCtrl("__master_control_stream", null, new Values(stopBp));
        LOG.debug("Send stop backpressure request for task-{}", (Object)this.taskId);
    }
}

