/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.callback.impl;

import backtype.storm.generated.Bolt;
import backtype.storm.generated.SpoutSpec;
import backtype.storm.generated.StormTopology;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.callback.BaseCallback;
import com.alibaba.jstorm.callback.impl.DelayStatusTransitionCallback;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.cluster.StormStatus;
import com.alibaba.jstorm.daemon.nimbus.NimbusData;
import com.alibaba.jstorm.daemon.nimbus.NimbusUtils;
import com.alibaba.jstorm.daemon.nimbus.StatusType;
import com.alibaba.jstorm.daemon.nimbus.TopologyAssign;
import com.alibaba.jstorm.daemon.nimbus.TopologyAssignEvent;
import com.alibaba.jstorm.task.TaskInfo;
import com.alibaba.jstorm.task.TkHbCacheTime;
import com.alibaba.jstorm.utils.JStormUtils;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DoRebalanceTransitionCallback
extends BaseCallback {
    private static Logger LOG = LoggerFactory.getLogger(DoRebalanceTransitionCallback.class);
    private NimbusData data;
    private String topologyid;
    private StormStatus oldStatus;
    private Set<Integer> newTasks;

    public DoRebalanceTransitionCallback(NimbusData data, String topologyid, StormStatus status) {
        this.data = data;
        this.topologyid = topologyid;
        this.oldStatus = status;
        this.newTasks = new HashSet<Integer>();
    }

    @Override
    public <T> Object execute(T ... args) {
        block10: {
            boolean isSetTaskInfo = false;
            try {
                Boolean reassign = (Boolean)args[1];
                Map conf = (Map)args[2];
                if (conf != null) {
                    boolean isConfUpdate = false;
                    Map<Object, Object> stormConf = this.data.getConf();
                    Map topoConf = StormConfig.read_nimbus_topology_conf(stormConf, this.topologyid);
                    StormTopology rawOldTopology = StormConfig.read_nimbus_topology_code(stormConf, this.topologyid);
                    StormTopology rawNewTopology = NimbusUtils.normalizeTopology(conf, rawOldTopology, true);
                    StormTopology sysOldTopology = rawOldTopology.deepCopy();
                    StormTopology sysNewTopology = rawNewTopology.deepCopy();
                    if (conf.get("topology.acker.executors") != null) {
                        Common.add_acker(topoConf, sysOldTopology);
                        Common.add_acker(conf, sysNewTopology);
                        int ackerNum = JStormUtils.parseInt(conf.get("topology.acker.executors"));
                        int oldAckerNum = JStormUtils.parseInt(topoConf.get("topology.acker.executors"));
                        LOG.info("Update acker from oldAckerNum=" + oldAckerNum + " to ackerNum=" + ackerNum);
                        topoConf.put("topology.acker.executors", ackerNum);
                        isConfUpdate = true;
                    }
                    this.setTaskInfo(sysOldTopology, sysNewTopology);
                    isSetTaskInfo = true;
                    StormConfig.write_nimbus_topology_code(stormConf, this.topologyid, Utils.serialize(rawNewTopology));
                    Set keys = conf.keySet();
                    Integer workerNum = JStormUtils.parseInt(conf.get("topology.workers"));
                    if (workerNum != null) {
                        Integer oldWorkerNum = JStormUtils.parseInt(topoConf.get("topology.workers"));
                        topoConf.put("topology.workers", workerNum);
                        isConfUpdate = true;
                        LOG.info("Update worker num from " + oldWorkerNum + " to " + workerNum);
                    }
                    if (keys.contains("isolation.scheduler.machines")) {
                        topoConf.put("isolation.scheduler.machines", conf.get("isolation.scheduler.machines"));
                    }
                    if (isConfUpdate) {
                        StormConfig.write_nimbus_topology_conf(stormConf, this.topologyid, topoConf);
                    }
                }
                TopologyAssignEvent event = new TopologyAssignEvent();
                event.setTopologyId(this.topologyid);
                event.setScratch(true);
                event.setOldStatus(this.oldStatus);
                event.setReassign(reassign);
                if (conf != null) {
                    event.setScaleTopology(true);
                }
                TopologyAssign.push(event);
                event.waitFinish();
            }
            catch (Exception e) {
                LOG.error("do-rebalance error!", (Throwable)e);
                if (!isSetTaskInfo) break block10;
                try {
                    StormClusterState clusterState = this.data.getStormClusterState();
                    clusterState.remove_task(this.topologyid, this.newTasks);
                }
                catch (Exception e1) {
                    LOG.error("Failed to rollback the changes on ZK for task-" + this.newTasks, (Throwable)e);
                }
            }
        }
        DelayStatusTransitionCallback delayCallback = new DelayStatusTransitionCallback(this.data, this.topologyid, this.oldStatus, StatusType.rebalancing, StatusType.done_rebalance);
        return delayCallback.execute(new Object[0]);
    }

    private void setTaskInfo(StormTopology oldTopology, StormTopology newTopology) throws Exception {
        StormClusterState clusterState = this.data.getStormClusterState();
        TreeSet<Integer> taskIds = new TreeSet<Integer>(clusterState.task_ids(this.topologyid));
        int cnt = taskIds.descendingIterator().next();
        cnt = this.setBoltInfo(oldTopology, newTopology, cnt, clusterState);
        cnt = this.setSpoutInfo(oldTopology, newTopology, cnt, clusterState);
    }

    private int setBoltInfo(StormTopology oldTopology, StormTopology newTopology, int cnt, StormClusterState clusterState) throws Exception {
        Map<String, Bolt> oldBolts = oldTopology.get_bolts();
        Map<String, Bolt> bolts = newTopology.get_bolts();
        for (Map.Entry<String, Bolt> entry : oldBolts.entrySet()) {
            String boltName = entry.getKey();
            Bolt oldBolt = entry.getValue();
            Bolt bolt = bolts.get(boltName);
            if (oldBolt.get_common().get_parallelism_hint() > bolt.get_common().get_parallelism_hint()) {
                int removedTaskNum = oldBolt.get_common().get_parallelism_hint() - bolt.get_common().get_parallelism_hint();
                TreeSet<Integer> taskIds = new TreeSet<Integer>(clusterState.task_ids_by_componentId(this.topologyid, boltName));
                Iterator<Integer> descendIterator = taskIds.descendingIterator();
                while (--removedTaskNum >= 0) {
                    int taskId = descendIterator.next();
                    this.removeTask(this.topologyid, taskId, clusterState);
                    LOG.info("Remove bolt task, taskId=" + taskId + " for " + boltName);
                }
                continue;
            }
            if (oldBolt.get_common().get_parallelism_hint() == bolt.get_common().get_parallelism_hint()) continue;
            int delta = bolt.get_common().get_parallelism_hint() - oldBolt.get_common().get_parallelism_hint();
            HashMap<Integer, TaskInfo> taskInfoMap = new HashMap<Integer, TaskInfo>();
            for (int i = 1; i <= delta; ++i) {
                TaskInfo taskInfo = new TaskInfo(entry.getKey(), "bolt");
                taskInfoMap.put(++cnt, taskInfo);
                this.newTasks.add(cnt);
                LOG.info("Setup new bolt task, taskId=" + cnt + " for " + boltName);
            }
            clusterState.add_task(this.topologyid, taskInfoMap);
        }
        return cnt;
    }

    private int setSpoutInfo(StormTopology oldTopology, StormTopology newTopology, int cnt, StormClusterState clusterState) throws Exception {
        Map<String, SpoutSpec> oldSpouts = oldTopology.get_spouts();
        Map<String, SpoutSpec> spouts = newTopology.get_spouts();
        for (Map.Entry<String, SpoutSpec> entry : oldSpouts.entrySet()) {
            String spoutName = entry.getKey();
            SpoutSpec oldSpout = entry.getValue();
            SpoutSpec spout = spouts.get(spoutName);
            if (oldSpout.get_common().get_parallelism_hint() > spout.get_common().get_parallelism_hint()) {
                int removedTaskNum = oldSpout.get_common().get_parallelism_hint() - spout.get_common().get_parallelism_hint();
                TreeSet<Integer> taskIds = new TreeSet<Integer>(clusterState.task_ids_by_componentId(this.topologyid, spoutName));
                Iterator<Integer> descendIterator = taskIds.descendingIterator();
                while (--removedTaskNum >= 0) {
                    int taskId = descendIterator.next();
                    this.removeTask(this.topologyid, taskId, clusterState);
                    LOG.info("Remove spout task, taskId=" + taskId + " for " + spoutName);
                }
                continue;
            }
            if (oldSpout.get_common().get_parallelism_hint() == spout.get_common().get_parallelism_hint()) continue;
            int delta = spout.get_common().get_parallelism_hint() - oldSpout.get_common().get_parallelism_hint();
            HashMap<Integer, TaskInfo> taskInfoMap = new HashMap<Integer, TaskInfo>();
            for (int i = 1; i <= delta; ++i) {
                TaskInfo taskInfo = new TaskInfo(entry.getKey(), "spout");
                taskInfoMap.put(++cnt, taskInfo);
                this.newTasks.add(cnt);
                LOG.info("Setup new spout task, taskId=" + cnt + " for " + spoutName);
            }
            clusterState.add_task(this.topologyid, taskInfoMap);
        }
        return cnt;
    }

    private void removeTask(String topologyId, int taskId, StormClusterState clusterState) throws Exception {
        HashSet<Integer> taskIds = new HashSet<Integer>(taskId);
        clusterState.remove_task(this.topologyid, taskIds);
        Map<Integer, TkHbCacheTime> TkHbs = this.data.getTaskHeartbeatsCache(this.topologyid, false);
        if (TkHbs != null) {
            TkHbs.remove(taskId);
        }
    }
}

