package com.alibaba.jstorm.daemon.worker;

import backtype.storm.Config;
import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.IContext;
import backtype.storm.scheduler.WorkerSlot;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.schedule.Assignment;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.TaskShutdownDameon;
import com.alibaba.jstorm.utils.JStormUtils;
import java.io.FileNotFoundException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/daemon/worker/RefreshConnections.class */
public class RefreshConnections extends RunnableCallback {
    private static Logger LOG = LoggerFactory.getLogger(RefreshConnections.class);
    private WorkerData workerData;
    private AtomicBoolean shutdown;
    private Map conf;
    private StormClusterState zkCluster;
    private String topologyId;
    private Set<Integer> outboundTasks;
    private Map<WorkerSlot, IConnection> nodeportSocket;
    private IContext context;
    private ConcurrentHashMap<Integer, WorkerSlot> taskNodeport;
    private Integer frequence;
    private String supervisorId;
    private int taskTimeoutSecs;
    private Integer assignmentVersion = -1;

    public RefreshConnections(WorkerData workerData) {
        this.workerData = workerData;
        this.shutdown = workerData.getShutdown();
        this.conf = workerData.getStormConf();
        this.zkCluster = workerData.getZkCluster();
        this.topologyId = workerData.getTopologyId();
        this.outboundTasks = workerData.getOutboundTasks();
        this.nodeportSocket = workerData.getNodeportSocket();
        this.context = workerData.getContext();
        this.taskNodeport = workerData.getTaskNodeport();
        this.supervisorId = workerData.getSupervisorId();
        this.frequence = JStormUtils.parseInt(this.conf.get(Config.TASK_REFRESH_POLL_SECS), 5);
        this.taskTimeoutSecs = JStormUtils.parseInt(this.conf.get(Config.TASK_HEARTBEAT_FREQUENCY_SECS), 10).intValue();
        this.taskTimeoutSecs *= 3;
    }

    @Override // com.alibaba.jstorm.callback.RunnableCallback, java.lang.Runnable
    public void run() {
        try {
            synchronized (this) {
                Integer assignment_version = this.zkCluster.assignment_version(this.topologyId, this);
                boolean z = assignment_version == null || !assignment_version.equals(this.assignmentVersion);
                boolean z2 = false;
                Long l = null;
                try {
                    l = Long.valueOf(StormConfig.read_supervisor_topology_timestamp(this.conf, this.topologyId));
                    z2 = l.longValue() > this.workerData.getAssignmentTs().longValue();
                } catch (FileNotFoundException e) {
                    LOG.warn("Failed to read supervisor topology timeStamp for " + this.topologyId + " port=" + this.workerData.getPort(), e);
                }
                if (z || z2) {
                    LOG.info("update worker data due to changed assignment!!!");
                    Assignment assignment_info = this.zkCluster.assignment_info(this.topologyId, this);
                    if (assignment_info == null) {
                        LOG.error("Failed to get Assignment of " + this.topologyId);
                        return;
                    }
                    if (z2) {
                        try {
                            if (assignment_info.getAssignmentType() == Assignment.AssignmentType.UpdateTopology) {
                                LOG.info("Get config reload request for " + this.topologyId);
                                List<TaskShutdownDameon> shutdownTasks = this.workerData.getShutdownTasks();
                                Map<? extends Object, ? extends Object> read_supervisor_topology_conf = StormConfig.read_supervisor_topology_conf(this.conf, this.topologyId);
                                this.workerData.getStormConf().putAll(read_supervisor_topology_conf);
                                Iterator<TaskShutdownDameon> it = shutdownTasks.iterator();
                                while (it.hasNext()) {
                                    it.next().update(read_supervisor_topology_conf);
                                }
                                this.workerData.getUpdateListener().update(read_supervisor_topology_conf);
                                this.workerData.setAssignmentType(Assignment.AssignmentType.UpdateTopology);
                            } else {
                                Set<Integer> addedTasks = getAddedTasks(assignment_info);
                                Set<Integer> removedTasks = getRemovedTasks(assignment_info);
                                Set<Integer> updatedTasks = getUpdatedTasks(assignment_info);
                                this.workerData.updateWorkerData(assignment_info);
                                this.workerData.updateKryoSerializer();
                                shutdownTasks(removedTasks);
                                createTasks(addedTasks);
                                updateTasks(updatedTasks);
                                Set<Integer> worker_output_tasks = Worker.worker_output_tasks(this.workerData);
                                if (!this.outboundTasks.equals(worker_output_tasks)) {
                                    Iterator<Integer> it2 = worker_output_tasks.iterator();
                                    while (it2.hasNext()) {
                                        int intValue = it2.next().intValue();
                                        if (!this.outboundTasks.contains(Integer.valueOf(intValue))) {
                                            this.workerData.addOutboundTaskStatusIfAbsent(Integer.valueOf(intValue));
                                        }
                                    }
                                    Iterator<Integer> it3 = this.workerData.getOutboundTaskStatus().keySet().iterator();
                                    while (it3.hasNext()) {
                                        int intValue2 = it3.next().intValue();
                                        if (!worker_output_tasks.contains(Integer.valueOf(intValue2))) {
                                            this.workerData.removeOutboundTaskStatus(Integer.valueOf(intValue2));
                                        }
                                    }
                                    this.workerData.setOutboundTasks(worker_output_tasks);
                                    this.outboundTasks = worker_output_tasks;
                                }
                                this.workerData.setAssignmentType(Assignment.AssignmentType.Assign);
                            }
                            if (l != null) {
                                this.workerData.setAssignmentTs(l);
                            }
                        } catch (Exception e2) {
                            LOG.warn("Failed to update worker data", e2);
                        }
                    }
                    Set<ResourceWorkerSlot> workers = assignment_info.getWorkers();
                    if (workers == null) {
                        LOG.error("Failed to get taskToResource of " + this.topologyId);
                        return;
                    }
                    this.workerData.updateWorkerToResource(workers);
                    HashMap hashMap = new HashMap();
                    Map<String, String> nodeHost = assignment_info.getNodeHost();
                    HashSet<WorkerSlot> hashSet = new HashSet();
                    HashSet hashSet2 = new HashSet();
                    HashSet hashSet3 = new HashSet();
                    if (this.outboundTasks != null) {
                        for (ResourceWorkerSlot resourceWorkerSlot : workers) {
                            if (this.supervisorId.equals(resourceWorkerSlot.getNodeId())) {
                                hashSet3.addAll(resourceWorkerSlot.getTasks());
                            }
                            if (this.supervisorId.equals(resourceWorkerSlot.getNodeId()) && resourceWorkerSlot.getPort() == this.workerData.getPort().intValue()) {
                                hashSet2.addAll(resourceWorkerSlot.getTasks());
                            }
                            for (Integer num : resourceWorkerSlot.getTasks()) {
                                if (this.outboundTasks.contains(num)) {
                                    hashMap.put(num, resourceWorkerSlot);
                                    hashSet.add(resourceWorkerSlot);
                                }
                            }
                        }
                    }
                    this.taskNodeport.putAll(hashMap);
                    this.workerData.setLocalTasks(hashSet2);
                    this.workerData.setLocalNodeTasks(hashSet3);
                    Set<WorkerSlot> keySet = this.nodeportSocket.keySet();
                    HashSet<WorkerSlot> hashSet4 = new HashSet();
                    HashSet<WorkerSlot> hashSet5 = new HashSet();
                    for (WorkerSlot workerSlot : hashSet) {
                        if (!keySet.contains(workerSlot)) {
                            hashSet4.add(workerSlot);
                        }
                    }
                    for (WorkerSlot workerSlot2 : keySet) {
                        if (!hashSet.contains(workerSlot2)) {
                            hashSet5.add(workerSlot2);
                        }
                    }
                    for (WorkerSlot workerSlot3 : hashSet4) {
                        this.nodeportSocket.put(workerSlot3, this.context.connect(this.topologyId, nodeHost.get(workerSlot3.getNodeId()), workerSlot3.getPort()));
                        LOG.info("Add connection to " + workerSlot3);
                    }
                    for (WorkerSlot workerSlot4 : hashSet5) {
                        LOG.info("Remove connection to " + workerSlot4);
                        this.nodeportSocket.remove(workerSlot4).close();
                    }
                }
                boolean z3 = true;
                for (Integer num2 : this.outboundTasks) {
                    boolean isOutTaskConnected = isOutTaskConnected(num2.intValue());
                    if (!isOutTaskConnected) {
                        z3 = isOutTaskConnected;
                    }
                    this.workerData.updateOutboundTaskStatus(num2, isOutTaskConnected);
                }
                if (z3) {
                    this.workerData.getWorkeInitConnectionStatus().getAndSet(z3);
                }
                if (assignment_version != null) {
                    this.assignmentVersion = assignment_version;
                }
            }
        } catch (Exception e3) {
            LOG.error("Failed to refresh worker Connection", e3);
            throw new RuntimeException(e3);
        }
    }

    @Override // com.alibaba.jstorm.callback.RunnableCallback
    public Object getResult() {
        return this.frequence;
    }

    private Set<Integer> getAddedTasks(Assignment assignment) {
        HashSet hashSet = new HashSet();
        try {
            for (Integer num : assignment.getCurrentWorkerTasks(this.workerData.getSupervisorId(), this.workerData.getPort().intValue())) {
                if (!this.workerData.getTaskids().contains(num)) {
                    hashSet.add(num);
                }
            }
        } catch (Exception e) {
            LOG.warn("Failed to get added task list for" + this.workerData.getTopologyId());
        }
        return hashSet;
    }

    private Set<Integer> getRemovedTasks(Assignment assignment) {
        HashSet hashSet = new HashSet();
        try {
            Set<Integer> currentWorkerTasks = assignment.getCurrentWorkerTasks(this.workerData.getSupervisorId(), this.workerData.getPort().intValue());
            for (Integer num : this.workerData.getTaskids()) {
                if (!currentWorkerTasks.contains(num)) {
                    hashSet.add(num);
                }
            }
        } catch (Exception e) {
            LOG.warn("Failed to get removed task list for" + this.workerData.getTopologyId());
        }
        return hashSet;
    }

    private Set<Integer> getUpdatedTasks(Assignment assignment) {
        HashSet hashSet = new HashSet();
        try {
            for (Integer num : assignment.getCurrentWorkerTasks(this.workerData.getSupervisorId(), this.workerData.getPort().intValue())) {
                if (this.workerData.getTaskids().contains(num)) {
                    hashSet.add(num);
                }
            }
        } catch (Exception e) {
            LOG.warn("Failed to get updated task list for" + this.workerData.getTopologyId());
        }
        return hashSet;
    }

    private void createTasks(Set<Integer> set) {
        if (set == null) {
            return;
        }
        for (Integer num : set) {
            try {
                this.workerData.addShutdownTask(Task.mk_task(this.workerData, num.intValue()));
            } catch (Exception e) {
                LOG.error("Failed to create task-" + num, e);
                throw new RuntimeException(e);
            }
        }
    }

    private void shutdownTasks(Set<Integer> set) {
        if (set == null) {
            return;
        }
        for (TaskShutdownDameon taskShutdownDameon : this.workerData.getShutdownDaemonbyTaskIds(set)) {
            try {
                taskShutdownDameon.shutdown();
            } catch (Exception e) {
                LOG.error("Failed to shutdown task-" + taskShutdownDameon.getTaskId(), e);
            }
        }
    }

    private void updateTasks(Set<Integer> set) {
        if (set == null) {
            return;
        }
        for (TaskShutdownDameon taskShutdownDameon : this.workerData.getShutdownDaemonbyTaskIds(set)) {
            try {
                taskShutdownDameon.getTask().updateTaskData();
            } catch (Exception e) {
                LOG.error("Failed to update task-" + taskShutdownDameon.getTaskId(), e);
            }
        }
    }

    private boolean isOutTaskConnected(int i) {
        IConnection iConnection;
        boolean z = false;
        if (this.workerData.getInnerTaskTransfer().get(Integer.valueOf(i)) != null) {
            z = true;
        } else {
            WorkerSlot workerSlot = this.taskNodeport.get(Integer.valueOf(i));
            if (workerSlot != null && (iConnection = this.nodeportSocket.get(workerSlot)) != null) {
                z = iConnection.available();
            }
        }
        return z;
    }
}
