/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.daemon.worker;

import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.IContext;
import backtype.storm.scheduler.WorkerSlot;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.daemon.worker.Worker;
import com.alibaba.jstorm.daemon.worker.WorkerData;
import com.alibaba.jstorm.schedule.Assignment;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.TaskShutdownDameon;
import com.alibaba.jstorm.utils.JStormUtils;
import java.io.FileNotFoundException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RefreshConnections
extends RunnableCallback {
    private static Logger LOG = LoggerFactory.getLogger(RefreshConnections.class);
    private WorkerData workerData;
    private AtomicBoolean shutdown;
    private Map conf;
    private StormClusterState zkCluster;
    private String topologyId;
    private Set<Integer> outboundTasks;
    private Map<WorkerSlot, IConnection> nodeportSocket;
    private IContext context;
    private ConcurrentHashMap<Integer, WorkerSlot> taskNodeport;
    private Integer frequence;
    private String supervisorId;
    private int taskTimeoutSecs;
    private Integer assignmentVersion = -1;

    public RefreshConnections(WorkerData workerData) {
        this.workerData = workerData;
        this.shutdown = workerData.getShutdown();
        this.conf = workerData.getStormConf();
        this.zkCluster = workerData.getZkCluster();
        this.topologyId = workerData.getTopologyId();
        this.outboundTasks = workerData.getOutboundTasks();
        this.nodeportSocket = workerData.getNodeportSocket();
        this.context = workerData.getContext();
        this.taskNodeport = workerData.getTaskNodeport();
        this.supervisorId = workerData.getSupervisorId();
        this.frequence = JStormUtils.parseInt(this.conf.get("task.refresh.poll.secs"), 5);
        this.taskTimeoutSecs = JStormUtils.parseInt(this.conf.get("task.heartbeat.frequency.secs"), 10);
        this.taskTimeoutSecs *= 3;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            RefreshConnections refreshConnections = this;
            synchronized (refreshConnections) {
                Integer recordedVersion = this.zkCluster.assignment_version(this.topologyId, this);
                boolean isUpdateAssignment = recordedVersion == null || !recordedVersion.equals(this.assignmentVersion);
                boolean isUpdateSupervisorTimeStamp = false;
                Long localAssignmentTS = null;
                try {
                    localAssignmentTS = StormConfig.read_supervisor_topology_timestamp(this.conf, this.topologyId);
                    isUpdateSupervisorTimeStamp = localAssignmentTS > this.workerData.getAssignmentTs();
                }
                catch (FileNotFoundException e) {
                    LOG.warn("Failed to read supervisor topology timeStamp for " + this.topologyId + " port=" + this.workerData.getPort(), (Throwable)e);
                }
                if (isUpdateAssignment || isUpdateSupervisorTimeStamp) {
                    Set<ResourceWorkerSlot> workers;
                    LOG.info("update worker data due to changed assignment!!!");
                    Assignment assignment = this.zkCluster.assignment_info(this.topologyId, this);
                    if (assignment == null) {
                        String errMsg = "Failed to get Assignment of " + this.topologyId;
                        LOG.error(errMsg);
                        return;
                    }
                    if (isUpdateSupervisorTimeStamp) {
                        try {
                            if (assignment.getAssignmentType() == Assignment.AssignmentType.UpdateTopology) {
                                LOG.info("Get config reload request for " + this.topologyId);
                                List<TaskShutdownDameon> taskShutdowns = this.workerData.getShutdownTasks();
                                Map newConf = StormConfig.read_supervisor_topology_conf(this.conf, this.topologyId);
                                this.workerData.getStormConf().putAll(newConf);
                                for (TaskShutdownDameon taskSD : taskShutdowns) {
                                    taskSD.update(newConf);
                                }
                                this.workerData.getUpdateListener().update(newConf);
                                this.workerData.setAssignmentType(Assignment.AssignmentType.UpdateTopology);
                            } else {
                                Set<Integer> addedTasks = this.getAddedTasks(assignment);
                                Set<Integer> removedTasks = this.getRemovedTasks(assignment);
                                Set<Integer> updatedTasks = this.getUpdatedTasks(assignment);
                                this.workerData.updateWorkerData(assignment);
                                this.workerData.updateKryoSerializer();
                                this.shutdownTasks(removedTasks);
                                this.createTasks(addedTasks);
                                this.updateTasks(updatedTasks);
                                Set<Integer> tmpOutboundTasks = Worker.worker_output_tasks(this.workerData);
                                if (!this.outboundTasks.equals(tmpOutboundTasks)) {
                                    for (int taskId : tmpOutboundTasks) {
                                        if (this.outboundTasks.contains(taskId)) continue;
                                        this.workerData.addOutboundTaskStatusIfAbsent(taskId);
                                    }
                                    for (int taskId : this.workerData.getOutboundTaskStatus().keySet()) {
                                        if (tmpOutboundTasks.contains(taskId)) continue;
                                        this.workerData.removeOutboundTaskStatus(taskId);
                                    }
                                    this.workerData.setOutboundTasks(tmpOutboundTasks);
                                    this.outboundTasks = tmpOutboundTasks;
                                }
                                this.workerData.setAssignmentType(Assignment.AssignmentType.Assign);
                            }
                            if (localAssignmentTS != null) {
                                this.workerData.setAssignmentTs(localAssignmentTS);
                            }
                        }
                        catch (Exception e) {
                            LOG.warn("Failed to update worker data", (Throwable)e);
                        }
                    }
                    if ((workers = assignment.getWorkers()) == null) {
                        String errMsg = "Failed to get taskToResource of " + this.topologyId;
                        LOG.error(errMsg);
                        return;
                    }
                    this.workerData.updateWorkerToResource(workers);
                    HashMap<Integer, ResourceWorkerSlot> my_assignment = new HashMap<Integer, ResourceWorkerSlot>();
                    Map<String, String> node = assignment.getNodeHost();
                    HashSet<ResourceWorkerSlot> need_connections = new HashSet<ResourceWorkerSlot>();
                    HashSet<Integer> localTasks = new HashSet<Integer>();
                    HashSet<Integer> localNodeTasks = new HashSet<Integer>();
                    if (this.outboundTasks != null) {
                        for (ResourceWorkerSlot worker : workers) {
                            if (this.supervisorId.equals(worker.getNodeId())) {
                                localNodeTasks.addAll(worker.getTasks());
                            }
                            if (this.supervisorId.equals(worker.getNodeId()) && worker.getPort() == this.workerData.getPort().intValue()) {
                                localTasks.addAll(worker.getTasks());
                            }
                            for (Integer id : worker.getTasks()) {
                                if (!this.outboundTasks.contains(id)) continue;
                                my_assignment.put(id, worker);
                                need_connections.add(worker);
                            }
                        }
                    }
                    this.taskNodeport.putAll(my_assignment);
                    this.workerData.setLocalTasks(localTasks);
                    this.workerData.setLocalNodeTasks(localNodeTasks);
                    Set<WorkerSlot> current_connections = this.nodeportSocket.keySet();
                    HashSet<WorkerSlot> new_connections = new HashSet<WorkerSlot>();
                    HashSet<WorkerSlot> remove_connections = new HashSet<WorkerSlot>();
                    for (WorkerSlot workerSlot : need_connections) {
                        if (current_connections.contains(workerSlot)) continue;
                        new_connections.add(workerSlot);
                    }
                    for (WorkerSlot workerSlot : current_connections) {
                        if (need_connections.contains(workerSlot)) continue;
                        remove_connections.add(workerSlot);
                    }
                    for (WorkerSlot workerSlot : new_connections) {
                        String host = node.get(workerSlot.getNodeId());
                        int port = workerSlot.getPort();
                        IConnection conn = this.context.connect(this.topologyId, host, port);
                        this.nodeportSocket.put(workerSlot, conn);
                        LOG.info("Add connection to " + workerSlot);
                    }
                    for (WorkerSlot workerSlot : remove_connections) {
                        LOG.info("Remove connection to " + workerSlot);
                        this.nodeportSocket.remove(workerSlot).close();
                    }
                }
                boolean allConnectionReady = true;
                for (Integer taskId : this.outboundTasks) {
                    boolean isConnected = this.isOutTaskConnected(taskId);
                    if (!isConnected) {
                        allConnectionReady = isConnected;
                    }
                    this.workerData.updateOutboundTaskStatus(taskId, isConnected);
                }
                if (allConnectionReady) {
                    this.workerData.getWorkeInitConnectionStatus().getAndSet(allConnectionReady);
                }
                if (recordedVersion != null) {
                    this.assignmentVersion = recordedVersion;
                }
            }
        }
        catch (Exception e) {
            LOG.error("Failed to refresh worker Connection", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    @Override
    public Object getResult() {
        return this.frequence;
    }

    private Set<Integer> getAddedTasks(Assignment assignment) {
        HashSet<Integer> ret = new HashSet<Integer>();
        try {
            Set<Integer> taskIds = assignment.getCurrentWorkerTasks(this.workerData.getSupervisorId(), this.workerData.getPort());
            for (Integer taskId : taskIds) {
                if (this.workerData.getTaskids().contains(taskId)) continue;
                ret.add(taskId);
            }
        }
        catch (Exception e) {
            LOG.warn("Failed to get added task list for" + this.workerData.getTopologyId());
        }
        return ret;
    }

    private Set<Integer> getRemovedTasks(Assignment assignment) {
        HashSet<Integer> ret = new HashSet<Integer>();
        try {
            Set<Integer> taskIds = assignment.getCurrentWorkerTasks(this.workerData.getSupervisorId(), this.workerData.getPort());
            for (Integer taskId : this.workerData.getTaskids()) {
                if (taskIds.contains(taskId)) continue;
                ret.add(taskId);
            }
        }
        catch (Exception e) {
            LOG.warn("Failed to get removed task list for" + this.workerData.getTopologyId());
        }
        return ret;
    }

    private Set<Integer> getUpdatedTasks(Assignment assignment) {
        HashSet<Integer> ret = new HashSet<Integer>();
        try {
            Set<Integer> taskIds = assignment.getCurrentWorkerTasks(this.workerData.getSupervisorId(), this.workerData.getPort());
            for (Integer taskId : taskIds) {
                if (!this.workerData.getTaskids().contains(taskId)) continue;
                ret.add(taskId);
            }
        }
        catch (Exception e) {
            LOG.warn("Failed to get updated task list for" + this.workerData.getTopologyId());
        }
        return ret;
    }

    private void createTasks(Set<Integer> tasks) {
        if (tasks == null) {
            return;
        }
        for (Integer taskId : tasks) {
            try {
                TaskShutdownDameon shutdown = Task.mk_task(this.workerData, taskId);
                this.workerData.addShutdownTask(shutdown);
            }
            catch (Exception e) {
                LOG.error("Failed to create task-" + taskId, (Throwable)e);
                throw new RuntimeException(e);
            }
        }
    }

    private void shutdownTasks(Set<Integer> tasks) {
        if (tasks == null) {
            return;
        }
        List<TaskShutdownDameon> shutdowns = this.workerData.getShutdownDaemonbyTaskIds(tasks);
        for (TaskShutdownDameon shutdown : shutdowns) {
            try {
                shutdown.shutdown();
            }
            catch (Exception e) {
                LOG.error("Failed to shutdown task-" + shutdown.getTaskId(), (Throwable)e);
            }
        }
    }

    private void updateTasks(Set<Integer> tasks) {
        if (tasks == null) {
            return;
        }
        List<TaskShutdownDameon> shutdowns = this.workerData.getShutdownDaemonbyTaskIds(tasks);
        for (TaskShutdownDameon shutdown : shutdowns) {
            try {
                shutdown.getTask().updateTaskData();
            }
            catch (Exception e) {
                LOG.error("Failed to update task-" + shutdown.getTaskId(), (Throwable)e);
            }
        }
    }

    private boolean isOutTaskConnected(int taskId) {
        boolean ret = false;
        if (this.workerData.getInnerTaskTransfer().get(taskId) != null) {
            ret = true;
        } else {
            IConnection connection;
            WorkerSlot slot = this.taskNodeport.get(taskId);
            if (slot != null && (connection = this.nodeportSocket.get(slot)) != null) {
                ret = connection.available();
            }
        }
        return ret;
    }
}

