/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.task.backpressure;

import backtype.storm.tuple.Values;
import backtype.storm.utils.DisruptorQueue;
import com.alibaba.jstorm.task.backpressure.Backpressure;
import com.alibaba.jstorm.task.execute.spout.SpoutCollector;
import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent;
import com.alibaba.jstorm.utils.JStormUtils;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BackpressureController
extends Backpressure {
    private static Logger LOG = LoggerFactory.getLogger(BackpressureController.class);
    private int taskId;
    private DisruptorQueue queueControlled;
    private int totalQueueSize;
    private int queueSizeReduced;
    private boolean isBackpressureMode = false;
    private SpoutCollector spoutCollector;
    private long maxBound;
    private long minBound;

    public BackpressureController(Map conf, int taskId, DisruptorQueue queue, int queueSize) {
        super(conf);
        this.queueControlled = queue;
        this.totalQueueSize = queueSize;
        this.queueSizeReduced = queueSize;
        this.taskId = taskId;
        this.maxBound = 0L;
        this.minBound = 0L;
    }

    public void setSpoutCollector(SpoutCollector spoutCollector) {
        this.spoutCollector = spoutCollector;
    }

    public void control(TopoMasterCtrlEvent ctrlEvent) {
        List<Object> value;
        if (!this.isBackpressureEnable) {
            return;
        }
        TopoMasterCtrlEvent.EventType eventType = ctrlEvent.getEventType();
        LOG.debug("Received control event, " + eventType.toString());
        if (eventType.equals((Object)TopoMasterCtrlEvent.EventType.startBackpressure)) {
            List<Object> value2 = ctrlEvent.getEventValue();
            int flowCtrlTime = value2.get(0) != null ? (Integer)value2.get(0) : 0;
            this.start(flowCtrlTime);
        } else if (eventType.equals((Object)TopoMasterCtrlEvent.EventType.stopBackpressure)) {
            this.stop();
        } else if (eventType.equals((Object)TopoMasterCtrlEvent.EventType.updateBackpressureConfig) && (value = ctrlEvent.getEventValue()) != null) {
            Map stormConf = (Map)value.get(0);
            this.updateConfig(stormConf);
            if (!this.isBackpressureEnable) {
                LOG.info("Disable backpressure in controller.");
                this.resetBackpressureInfo();
            } else {
                LOG.info("Enable backpressure in controller");
            }
        }
    }

    public void flowControl() {
        if (!this.isBackpressureEnable) {
            return;
        }
        try {
            Thread.sleep(this.sleepTime);
            while (!this.isQueueCapacityAvailable()) {
                Thread.sleep(1L);
            }
        }
        catch (InterruptedException e) {
            LOG.error("Sleep was interrupted!");
        }
    }

    private void resetBackpressureInfo() {
        this.sleepTime = 0L;
        this.maxBound = 0L;
        this.minBound = 0L;
        this.queueSizeReduced = this.totalQueueSize;
        this.isBackpressureMode = false;
    }

    private void start(int flowCtrlTime) {
        int size;
        this.sleepTime = flowCtrlTime > 0 ? (this.maxBound < (long)flowCtrlTime ? (long)flowCtrlTime : (this.maxBound == (long)flowCtrlTime ? (this.sleepTime >= this.maxBound ? ++this.sleepTime : JStormUtils.halfValueOfSum(flowCtrlTime, this.sleepTime, true)) : (this.maxBound <= this.sleepTime ? ++this.sleepTime : (this.sleepTime >= (long)flowCtrlTime ? JStormUtils.halfValueOfSum(this.maxBound, this.sleepTime, true) : JStormUtils.halfValueOfSum(flowCtrlTime, this.sleepTime, true))))) : ++this.sleepTime;
        if (this.sleepTime > this.maxBound) {
            this.maxBound = this.sleepTime;
        }
        this.queueSizeReduced = (size = this.totalQueueSize / 100) > 10 ? size : 10;
        this.isBackpressureMode = true;
        LOG.debug("Start backpressure at spout-{}, sleepTime={}, queueSizeReduced={}, flowCtrlTime={}", new Object[]{this.taskId, this.sleepTime, this.queueSizeReduced, flowCtrlTime});
    }

    private void stop() {
        if (this.sleepTime == this.minBound) {
            this.minBound = 0L;
        }
        this.sleepTime = JStormUtils.halfValueOfSum(this.minBound, this.sleepTime, false);
        if (this.sleepTime == 0L) {
            this.resetBackpressureInfo();
            TopoMasterCtrlEvent stopBp = new TopoMasterCtrlEvent(TopoMasterCtrlEvent.EventType.stopBackpressure, null);
            this.spoutCollector.emitCtrl("__master_control_stream", new Values(stopBp), null);
        } else {
            this.minBound = this.sleepTime;
        }
        LOG.debug("Stop backpressure at spout-{}, sleepTime={}, queueSizeReduced={}, flowCtrlTime={}", new Object[]{this.taskId, this.sleepTime, this.queueSizeReduced});
    }

    public boolean isBackpressureMode() {
        return this.isBackpressureMode & this.isBackpressureEnable;
    }

    public boolean isQueueCapacityAvailable() {
        return this.queueControlled.population() < (long)this.queueSizeReduced;
    }
}

