package com.alibaba.jstorm.callback.impl;

import backtype.storm.Config;
import backtype.storm.generated.Bolt;
import backtype.storm.generated.SpoutSpec;
import backtype.storm.generated.StormTopology;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.callback.BaseCallback;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.cluster.StormStatus;
import com.alibaba.jstorm.daemon.nimbus.NimbusData;
import com.alibaba.jstorm.daemon.nimbus.NimbusUtils;
import com.alibaba.jstorm.daemon.nimbus.StatusType;
import com.alibaba.jstorm.daemon.nimbus.TopologyAssign;
import com.alibaba.jstorm.daemon.nimbus.TopologyAssignEvent;
import com.alibaba.jstorm.task.TaskInfo;
import com.alibaba.jstorm.task.TkHbCacheTime;
import com.alibaba.jstorm.utils.JStormUtils;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/callback/impl/DoRebalanceTransitionCallback.class */
public class DoRebalanceTransitionCallback extends BaseCallback {
    private static Logger LOG = LoggerFactory.getLogger(DoRebalanceTransitionCallback.class);
    private NimbusData data;
    private String topologyid;
    private StormStatus oldStatus;
    private Set<Integer> newTasks = new HashSet();

    public DoRebalanceTransitionCallback(NimbusData nimbusData, String str, StormStatus stormStatus) {
        this.data = nimbusData;
        this.topologyid = str;
        this.oldStatus = stormStatus;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.alibaba.jstorm.callback.BaseCallback, com.alibaba.jstorm.callback.Callback
    public <T> Object execute(T... tArr) {
        boolean z = false;
        try {
            Boolean bool = (Boolean) tArr[1];
            Map map = (Map) tArr[2];
            if (map != null) {
                boolean z2 = false;
                Map<Object, Object> conf = this.data.getConf();
                Map read_nimbus_topology_conf = StormConfig.read_nimbus_topology_conf(conf, this.topologyid);
                StormTopology read_nimbus_topology_code = StormConfig.read_nimbus_topology_code(conf, this.topologyid);
                StormTopology normalizeTopology = NimbusUtils.normalizeTopology(map, read_nimbus_topology_code, true);
                StormTopology m849deepCopy = read_nimbus_topology_code.m849deepCopy();
                StormTopology m849deepCopy2 = normalizeTopology.m849deepCopy();
                if (map.get(Config.TOPOLOGY_ACKER_EXECUTORS) != null) {
                    Common.add_acker(read_nimbus_topology_conf, m849deepCopy);
                    Common.add_acker(map, m849deepCopy2);
                    int intValue = JStormUtils.parseInt(map.get(Config.TOPOLOGY_ACKER_EXECUTORS)).intValue();
                    LOG.info("Update acker from oldAckerNum=" + JStormUtils.parseInt(read_nimbus_topology_conf.get(Config.TOPOLOGY_ACKER_EXECUTORS)).intValue() + " to ackerNum=" + intValue);
                    read_nimbus_topology_conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, Integer.valueOf(intValue));
                    z2 = true;
                }
                setTaskInfo(m849deepCopy, m849deepCopy2);
                z = true;
                StormConfig.write_nimbus_topology_code(conf, this.topologyid, Utils.serialize(normalizeTopology));
                Set keySet = map.keySet();
                Integer parseInt = JStormUtils.parseInt(map.get(Config.TOPOLOGY_WORKERS));
                if (parseInt != null) {
                    Integer parseInt2 = JStormUtils.parseInt(read_nimbus_topology_conf.get(Config.TOPOLOGY_WORKERS));
                    read_nimbus_topology_conf.put(Config.TOPOLOGY_WORKERS, parseInt);
                    z2 = true;
                    LOG.info("Update worker num from " + parseInt2 + " to " + parseInt);
                }
                if (keySet.contains(Config.ISOLATION_SCHEDULER_MACHINES)) {
                    read_nimbus_topology_conf.put(Config.ISOLATION_SCHEDULER_MACHINES, map.get(Config.ISOLATION_SCHEDULER_MACHINES));
                }
                if (z2) {
                    StormConfig.write_nimbus_topology_conf(conf, this.topologyid, read_nimbus_topology_conf);
                }
            }
            TopologyAssignEvent topologyAssignEvent = new TopologyAssignEvent();
            topologyAssignEvent.setTopologyId(this.topologyid);
            topologyAssignEvent.setScratch(true);
            topologyAssignEvent.setOldStatus(this.oldStatus);
            topologyAssignEvent.setReassign(bool.booleanValue());
            if (map != null) {
                topologyAssignEvent.setScaleTopology(true);
            }
            TopologyAssign.push(topologyAssignEvent);
            topologyAssignEvent.waitFinish();
        } catch (Exception e) {
            LOG.error("do-rebalance error!", e);
            if (z) {
                try {
                    this.data.getStormClusterState().remove_task(this.topologyid, this.newTasks);
                } catch (Exception e2) {
                    LOG.error("Failed to rollback the changes on ZK for task-" + this.newTasks, e);
                }
            }
        }
        return new DelayStatusTransitionCallback(this.data, this.topologyid, this.oldStatus, StatusType.rebalancing, StatusType.done_rebalance).execute(new Object[0]);
    }

    private void setTaskInfo(StormTopology stormTopology, StormTopology stormTopology2) throws Exception {
        StormClusterState stormClusterState = this.data.getStormClusterState();
        setSpoutInfo(stormTopology, stormTopology2, setBoltInfo(stormTopology, stormTopology2, ((Integer) new TreeSet(stormClusterState.task_ids(this.topologyid)).descendingIterator().next()).intValue(), stormClusterState), stormClusterState);
    }

    private int setBoltInfo(StormTopology stormTopology, StormTopology stormTopology2, int i, StormClusterState stormClusterState) throws Exception {
        Map<String, Bolt> map = stormTopology.get_bolts();
        Map<String, Bolt> map2 = stormTopology2.get_bolts();
        for (Map.Entry<String, Bolt> entry : map.entrySet()) {
            String key = entry.getKey();
            Bolt value = entry.getValue();
            Bolt bolt = map2.get(key);
            if (value.get_common().get_parallelism_hint() > bolt.get_common().get_parallelism_hint()) {
                int i2 = value.get_common().get_parallelism_hint() - bolt.get_common().get_parallelism_hint();
                Iterator descendingIterator = new TreeSet(stormClusterState.task_ids_by_componentId(this.topologyid, key)).descendingIterator();
                while (true) {
                    i2--;
                    if (i2 >= 0) {
                        int intValue = ((Integer) descendingIterator.next()).intValue();
                        removeTask(this.topologyid, intValue, stormClusterState);
                        LOG.info("Remove bolt task, taskId=" + intValue + " for " + key);
                    }
                }
            } else if (value.get_common().get_parallelism_hint() != bolt.get_common().get_parallelism_hint()) {
                int i3 = bolt.get_common().get_parallelism_hint() - value.get_common().get_parallelism_hint();
                HashMap hashMap = new HashMap();
                for (int i4 = 1; i4 <= i3; i4++) {
                    i++;
                    hashMap.put(Integer.valueOf(i), new TaskInfo(entry.getKey(), "bolt"));
                    this.newTasks.add(Integer.valueOf(i));
                    LOG.info("Setup new bolt task, taskId=" + i + " for " + key);
                }
                stormClusterState.add_task(this.topologyid, hashMap);
            }
        }
        return i;
    }

    private int setSpoutInfo(StormTopology stormTopology, StormTopology stormTopology2, int i, StormClusterState stormClusterState) throws Exception {
        Map<String, SpoutSpec> map = stormTopology.get_spouts();
        Map<String, SpoutSpec> map2 = stormTopology2.get_spouts();
        for (Map.Entry<String, SpoutSpec> entry : map.entrySet()) {
            String key = entry.getKey();
            SpoutSpec value = entry.getValue();
            SpoutSpec spoutSpec = map2.get(key);
            if (value.get_common().get_parallelism_hint() > spoutSpec.get_common().get_parallelism_hint()) {
                int i2 = value.get_common().get_parallelism_hint() - spoutSpec.get_common().get_parallelism_hint();
                Iterator descendingIterator = new TreeSet(stormClusterState.task_ids_by_componentId(this.topologyid, key)).descendingIterator();
                while (true) {
                    i2--;
                    if (i2 >= 0) {
                        int intValue = ((Integer) descendingIterator.next()).intValue();
                        removeTask(this.topologyid, intValue, stormClusterState);
                        LOG.info("Remove spout task, taskId=" + intValue + " for " + key);
                    }
                }
            } else if (value.get_common().get_parallelism_hint() != spoutSpec.get_common().get_parallelism_hint()) {
                int i3 = spoutSpec.get_common().get_parallelism_hint() - value.get_common().get_parallelism_hint();
                HashMap hashMap = new HashMap();
                for (int i4 = 1; i4 <= i3; i4++) {
                    i++;
                    hashMap.put(Integer.valueOf(i), new TaskInfo(entry.getKey(), "spout"));
                    this.newTasks.add(Integer.valueOf(i));
                    LOG.info("Setup new spout task, taskId=" + i + " for " + key);
                }
                stormClusterState.add_task(this.topologyid, hashMap);
            }
        }
        return i;
    }

    private void removeTask(String str, int i, StormClusterState stormClusterState) throws Exception {
        stormClusterState.remove_task(this.topologyid, new HashSet(i));
        Map<Integer, TkHbCacheTime> taskHeartbeatsCache = this.data.getTaskHeartbeatsCache(this.topologyid, false);
        if (taskHeartbeatsCache != null) {
            taskHeartbeatsCache.remove(Integer.valueOf(i));
        }
    }
}
