/*
 * Decompiled with CFR 0.152.
 */
package org.apache.helix.controller;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.helix.ConfigChangeListener;
import org.apache.helix.ControllerChangeListener;
import org.apache.helix.CurrentStateChangeListener;
import org.apache.helix.ExternalViewChangeListener;
import org.apache.helix.HealthStateChangeListener;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.IdealStateChangeListener;
import org.apache.helix.LiveInstanceChangeListener;
import org.apache.helix.MessageListener;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.controller.pipeline.PipelineRegistry;
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.CompatibilityCheckStage;
import org.apache.helix.controller.stages.CurrentStateComputationStage;
import org.apache.helix.controller.stages.ExternalViewComputeStage;
import org.apache.helix.controller.stages.MessageGenerationPhase;
import org.apache.helix.controller.stages.MessageSelectionStage;
import org.apache.helix.controller.stages.MessageThrottleStage;
import org.apache.helix.controller.stages.ReadClusterDataStage;
import org.apache.helix.controller.stages.ResourceComputationStage;
import org.apache.helix.controller.stages.TaskAssignmentStage;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HealthStat;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.PauseSignal;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.log4j.Logger;

public class GenericHelixController
implements ConfigChangeListener,
IdealStateChangeListener,
LiveInstanceChangeListener,
MessageListener,
CurrentStateChangeListener,
ExternalViewChangeListener,
ControllerChangeListener,
HealthStateChangeListener {
    private static final Logger logger = Logger.getLogger((String)GenericHelixController.class.getName());
    volatile boolean init = false;
    private final PipelineRegistry _registry;
    private final Set<String> _instanceCurrentStateChangeSubscriptionSessionIds;
    private final Set<String> _instanceSubscriptionNames;
    ClusterStatusMonitor _clusterStatusMonitor;
    private boolean _paused = false;
    Timer _rebalanceTimer = null;
    int _timerPeriod = Integer.MAX_VALUE;

    public GenericHelixController() {
        this(GenericHelixController.createDefaultRegistry());
    }

    void startRebalancingTimer(int period, HelixManager manager) {
        logger.info((Object)("Controller starting timer at period " + period));
        if (period < this._timerPeriod) {
            if (this._rebalanceTimer != null) {
                this._rebalanceTimer.cancel();
            }
            this._rebalanceTimer = new Timer(true);
            this._timerPeriod = period;
            this._rebalanceTimer.scheduleAtFixedRate((TimerTask)new RebalanceTask(manager), this._timerPeriod, (long)this._timerPeriod);
        } else {
            logger.info((Object)("Controller already has timer at period " + this._timerPeriod));
        }
    }

    void stopRebalancingTimer() {
        if (this._rebalanceTimer != null) {
            this._rebalanceTimer.cancel();
            this._rebalanceTimer = null;
        }
        this._timerPeriod = Integer.MAX_VALUE;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static PipelineRegistry createDefaultRegistry() {
        logger.info((Object)"createDefaultRegistry");
        Class<GenericHelixController> clazz = GenericHelixController.class;
        synchronized (GenericHelixController.class) {
            PipelineRegistry registry = new PipelineRegistry();
            Pipeline dataRefresh = new Pipeline();
            dataRefresh.addStage(new ReadClusterDataStage());
            Pipeline rebalancePipeline = new Pipeline();
            rebalancePipeline.addStage(new ResourceComputationStage());
            rebalancePipeline.addStage(new CurrentStateComputationStage());
            rebalancePipeline.addStage(new BestPossibleStateCalcStage());
            rebalancePipeline.addStage(new MessageGenerationPhase());
            rebalancePipeline.addStage(new MessageSelectionStage());
            rebalancePipeline.addStage(new MessageThrottleStage());
            rebalancePipeline.addStage(new TaskAssignmentStage());
            Pipeline externalViewPipeline = new Pipeline();
            externalViewPipeline.addStage(new ExternalViewComputeStage());
            Pipeline liveInstancePipeline = new Pipeline();
            liveInstancePipeline.addStage(new CompatibilityCheckStage());
            registry.register("idealStateChange", dataRefresh, rebalancePipeline);
            registry.register("currentStateChange", dataRefresh, rebalancePipeline, externalViewPipeline);
            registry.register("configChange", dataRefresh, rebalancePipeline);
            registry.register("liveInstanceChange", dataRefresh, liveInstancePipeline, rebalancePipeline, externalViewPipeline);
            registry.register("messageChange", dataRefresh, rebalancePipeline);
            registry.register("externalView", dataRefresh);
            registry.register("resume", dataRefresh, rebalancePipeline, externalViewPipeline);
            registry.register("periodicalRebalance", dataRefresh, rebalancePipeline, externalViewPipeline);
            // ** MonitorExit[var0] (shouldn't be in output)
            return registry;
        }
    }

    public GenericHelixController(PipelineRegistry registry) {
        this._registry = registry;
        this._instanceCurrentStateChangeSubscriptionSessionIds = new ConcurrentSkipListSet<String>();
        this._instanceSubscriptionNames = new ConcurrentSkipListSet<String>();
    }

    protected synchronized void handleEvent(ClusterEvent event) {
        List<Pipeline> pipelines;
        HelixManager manager = (HelixManager)event.getAttribute("helixmanager");
        if (manager == null) {
            logger.error((Object)("No cluster manager in event:" + event.getName()));
            return;
        }
        if (!manager.isLeader()) {
            logger.error((Object)("Cluster manager: " + manager.getInstanceName() + " is not leader. Pipeline will not be invoked"));
            return;
        }
        if (this._paused) {
            logger.info((Object)("Cluster is paused. Ignoring the event:" + event.getName()));
            return;
        }
        NotificationContext context = null;
        if (event.getAttribute("changeContext") != null) {
            context = (NotificationContext)event.getAttribute("changeContext");
        }
        if (context != null) {
            if (context.getType() == NotificationContext.Type.FINALIZE) {
                if (this._clusterStatusMonitor != null) {
                    this._clusterStatusMonitor.reset();
                    this._clusterStatusMonitor = null;
                }
                this.stopRebalancingTimer();
                logger.info((Object)("Get FINALIZE notification, skip the pipeline. Event :" + event.getName()));
                return;
            }
            if (this._clusterStatusMonitor == null) {
                this._clusterStatusMonitor = new ClusterStatusMonitor(manager.getClusterName());
            }
            event.addAttribute("clusterStatusMonitor", this._clusterStatusMonitor);
        }
        if ((pipelines = this._registry.getPipelinesForEvent(event.getName())) == null || pipelines.size() == 0) {
            logger.info((Object)("No pipeline to run for event:" + event.getName()));
            return;
        }
        for (Pipeline pipeline : pipelines) {
            try {
                pipeline.handle(event);
                pipeline.finish();
            }
            catch (Exception e) {
                logger.error((Object)("Exception while executing pipeline: " + pipeline + ". Will not continue to next pipeline"), (Throwable)e);
                break;
            }
        }
    }

    @Override
    public void onExternalViewChange(List<ExternalView> externalViewList, NotificationContext changeContext) {
    }

    @Override
    public void onStateChange(String instanceName, List<CurrentState> statesInfo, NotificationContext changeContext) {
        logger.info((Object)"START: GenericClusterController.onStateChange()");
        ClusterEvent event = new ClusterEvent("currentStateChange");
        event.addAttribute("helixmanager", changeContext.getManager());
        event.addAttribute("instanceName", instanceName);
        event.addAttribute("changeContext", changeContext);
        event.addAttribute("eventData", statesInfo);
        this.handleEvent(event);
        logger.info((Object)"END: GenericClusterController.onStateChange()");
    }

    @Override
    public void onHealthChange(String instanceName, List<HealthStat> reports, NotificationContext changeContext) {
    }

    @Override
    public void onMessage(String instanceName, List<Message> messages, NotificationContext changeContext) {
        logger.info((Object)"START: GenericClusterController.onMessage()");
        ClusterEvent event = new ClusterEvent("messageChange");
        event.addAttribute("helixmanager", changeContext.getManager());
        event.addAttribute("instanceName", instanceName);
        event.addAttribute("changeContext", changeContext);
        event.addAttribute("eventData", messages);
        this.handleEvent(event);
        if (this._clusterStatusMonitor != null && messages != null) {
            this._clusterStatusMonitor.addMessageQueueSize(instanceName, messages.size());
        }
        logger.info((Object)"END: GenericClusterController.onMessage()");
    }

    @Override
    public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext changeContext) {
        logger.info((Object)"START: Generic GenericClusterController.onLiveInstanceChange()");
        if (liveInstances == null) {
            liveInstances = Collections.emptyList();
        }
        if (changeContext.getType() == NotificationContext.Type.INIT || changeContext.getType() == NotificationContext.Type.CALLBACK) {
            this.checkLiveInstancesObservation(liveInstances, changeContext);
        }
        ClusterEvent event = new ClusterEvent("liveInstanceChange");
        event.addAttribute("helixmanager", changeContext.getManager());
        event.addAttribute("changeContext", changeContext);
        event.addAttribute("eventData", liveInstances);
        this.handleEvent(event);
        logger.info((Object)"END: Generic GenericClusterController.onLiveInstanceChange()");
    }

    void checkRebalancingTimer(HelixManager manager, List<IdealState> idealStates) {
        if (manager.getConfigAccessor() == null) {
            logger.warn((Object)(manager.getInstanceName() + " config accessor doesn't exist. should be in file-based mode."));
            return;
        }
        for (IdealState idealState : idealStates) {
            int period = idealState.getRebalanceTimerPeriod();
            if (period <= 0) continue;
            this.startRebalancingTimer(period, manager);
        }
    }

    @Override
    public void onIdealStateChange(List<IdealState> idealStates, NotificationContext changeContext) {
        logger.info((Object)"START: Generic GenericClusterController.onIdealStateChange()");
        ClusterEvent event = new ClusterEvent("idealStateChange");
        event.addAttribute("helixmanager", changeContext.getManager());
        event.addAttribute("changeContext", changeContext);
        event.addAttribute("eventData", idealStates);
        this.handleEvent(event);
        if (changeContext.getType() != NotificationContext.Type.FINALIZE) {
            this.checkRebalancingTimer(changeContext.getManager(), idealStates);
        }
        logger.info((Object)"END: Generic GenericClusterController.onIdealStateChange()");
    }

    @Override
    public void onConfigChange(List<InstanceConfig> configs, NotificationContext changeContext) {
        logger.info((Object)"START: GenericClusterController.onConfigChange()");
        ClusterEvent event = new ClusterEvent("configChange");
        event.addAttribute("changeContext", changeContext);
        event.addAttribute("helixmanager", changeContext.getManager());
        event.addAttribute("eventData", configs);
        this.handleEvent(event);
        logger.info((Object)"END: GenericClusterController.onConfigChange()");
    }

    @Override
    public void onControllerChange(NotificationContext changeContext) {
        logger.info((Object)"START: GenericClusterController.onControllerChange()");
        HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor();
        PropertyKey.Builder keyBuilder = accessor.keyBuilder();
        LiveInstance leader = (LiveInstance)accessor.getProperty(keyBuilder.controllerLeader());
        if (leader == null) {
            logger.warn((Object)("No controller exists for cluster:" + changeContext.getManager().getClusterName()));
            return;
        }
        String leaderName = leader.getInstanceName();
        String instanceName = changeContext.getManager().getInstanceName();
        if (leaderName == null || !leaderName.equals(instanceName)) {
            logger.warn((Object)("leader name does NOT match, my name: " + instanceName + ", leader: " + leader));
            return;
        }
        PauseSignal pauseSignal = (PauseSignal)accessor.getProperty(keyBuilder.pause());
        if (pauseSignal != null) {
            this._paused = true;
            logger.info((Object)"controller is now paused");
        } else if (this._paused) {
            logger.info((Object)"controller is now resumed");
            this._paused = false;
            ClusterEvent event = new ClusterEvent("resume");
            event.addAttribute("changeContext", changeContext);
            event.addAttribute("helixmanager", changeContext.getManager());
            event.addAttribute("eventData", pauseSignal);
            this.handleEvent(event);
        } else {
            this._paused = false;
        }
        logger.info((Object)"END: GenericClusterController.onControllerChange()");
    }

    protected void checkLiveInstancesObservation(List<LiveInstance> liveInstances, NotificationContext changeContext) {
        for (LiveInstance instance : liveInstances) {
            String instanceName = instance.getId();
            String clientSessionId = instance.getSessionId();
            HelixManager manager = changeContext.getManager();
            if (!this._instanceCurrentStateChangeSubscriptionSessionIds.contains(clientSessionId)) {
                try {
                    manager.addCurrentStateChangeListener(this, instanceName, clientSessionId);
                    this._instanceCurrentStateChangeSubscriptionSessionIds.add(clientSessionId);
                    logger.info((Object)("Observing client session id: " + clientSessionId));
                }
                catch (Exception e) {
                    logger.error((Object)("Exception adding current state and message listener for instance:" + instanceName), (Throwable)e);
                }
            }
            if (this._instanceSubscriptionNames.contains(instanceName)) continue;
            try {
                logger.info((Object)("Adding message listener for " + instanceName));
                manager.addMessageListener(this, instanceName);
                this._instanceSubscriptionNames.add(instanceName);
            }
            catch (Exception e) {
                logger.error((Object)("Exception adding message listener for instance:" + instanceName), (Throwable)e);
            }
        }
    }

    class RebalanceTask
    extends TimerTask {
        HelixManager _manager;

        public RebalanceTask(HelixManager manager) {
            this._manager = manager;
        }

        @Override
        public void run() {
            NotificationContext changeContext = new NotificationContext(this._manager);
            changeContext.setType(NotificationContext.Type.CALLBACK);
            ClusterEvent event = new ClusterEvent("periodicalRebalance");
            event.addAttribute("helixmanager", changeContext.getManager());
            event.addAttribute("changeContext", changeContext);
            ArrayList dummy = new ArrayList();
            event.addAttribute("eventData", dummy);
            GenericHelixController.this.handleEvent(event);
        }
    }
}

