package org.apache.reef.io.network.group.impl.driver;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.reef.driver.parameters.DriverIdentifier;
import org.apache.reef.io.network.group.api.config.OperatorSpec;
import org.apache.reef.io.network.group.api.driver.TaskNode;
import org.apache.reef.io.network.group.api.driver.Topology;
import org.apache.reef.io.network.group.api.operators.GroupCommOperator;
import org.apache.reef.io.network.group.impl.GroupChangesCodec;
import org.apache.reef.io.network.group.impl.GroupChangesImpl;
import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
import org.apache.reef.io.network.group.impl.config.BroadcastOperatorSpec;
import org.apache.reef.io.network.group.impl.config.GatherOperatorSpec;
import org.apache.reef.io.network.group.impl.config.ReduceOperatorSpec;
import org.apache.reef.io.network.group.impl.config.ScatterOperatorSpec;
import org.apache.reef.io.network.group.impl.config.parameters.CommGroupNameClass;
import org.apache.reef.io.network.group.impl.config.parameters.DataCodec;
import org.apache.reef.io.network.group.impl.config.parameters.GroupCommSenderStage;
import org.apache.reef.io.network.group.impl.config.parameters.OperatorNameClass;
import org.apache.reef.io.network.group.impl.config.parameters.ReduceFunctionParam;
import org.apache.reef.io.network.group.impl.config.parameters.TaskVersion;
import org.apache.reef.io.network.group.impl.config.parameters.TreeTopologyFanOut;
import org.apache.reef.io.network.group.impl.operators.BroadcastReceiver;
import org.apache.reef.io.network.group.impl.operators.BroadcastSender;
import org.apache.reef.io.network.group.impl.operators.GatherReceiver;
import org.apache.reef.io.network.group.impl.operators.GatherSender;
import org.apache.reef.io.network.group.impl.operators.ReduceReceiver;
import org.apache.reef.io.network.group.impl.operators.ReduceSender;
import org.apache.reef.io.network.group.impl.operators.ScatterReceiver;
import org.apache.reef.io.network.group.impl.operators.ScatterSender;
import org.apache.reef.io.network.group.impl.utils.Utils;
import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.JavaConfigurationBuilder;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Name;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.formats.AvroConfigurationSerializer;
import org.apache.reef.tang.formats.ConfigurationSerializer;
import org.apache.reef.wake.EStage;
import org.apache.reef.wake.impl.SingleThreadStage;

/* loaded from: input_file:org/apache/reef/io/network/group/impl/driver/TreeTopology.class */
public class TreeTopology implements Topology {
    private static final Logger LOG = Logger.getLogger(TreeTopology.class.getName());
    private final EStage<GroupCommunicationMessage> senderStage;
    private final Class<? extends Name<String>> groupName;
    private final Class<? extends Name<String>> operName;
    private final String driverId;
    private String rootId;
    private OperatorSpec operatorSpec;
    private TaskNode root;
    private TaskNode logicalRoot;
    private TaskNode prev;
    private final int fanOut;
    private final ConcurrentMap<String, TaskNode> nodes = new ConcurrentSkipListMap();
    private final ConfigurationSerializer confSer = new AvroConfigurationSerializer();

    @Deprecated
    public TreeTopology(EStage<GroupCommunicationMessage> eStage, Class<? extends Name<String>> cls, Class<? extends Name<String>> cls2, String str, int i, int i2) {
        this.senderStage = eStage;
        this.groupName = cls;
        this.operName = cls2;
        this.driverId = str;
        this.fanOut = i2;
        LOG.config(getQualifiedName() + "Tree Topology running with a fan-out of " + i2);
    }

    @Inject
    private TreeTopology(@Parameter(GroupCommSenderStage.class) EStage<GroupCommunicationMessage> eStage, @Parameter(CommGroupNameClass.class) Class<? extends Name<String>> cls, @Parameter(OperatorNameClass.class) Class<? extends Name<String>> cls2, @Parameter(DriverIdentifier.class) String str, @Parameter(TreeTopologyFanOut.class) int i) {
        this.senderStage = eStage;
        this.groupName = cls;
        this.operName = cls2;
        this.driverId = str;
        this.fanOut = i;
        LOG.config(getQualifiedName() + "Tree Topology running with a fan-out of " + i);
    }

    @Override // org.apache.reef.io.network.group.api.driver.Topology
    public void setRootTask(String str) {
        LOG.entering("TreeTopology", "setRootTask", new Object[]{getQualifiedName(), str});
        this.rootId = str;
        LOG.exiting("TreeTopology", "setRootTask", getQualifiedName() + str);
    }

    @Override // org.apache.reef.io.network.group.api.driver.Topology
    public String getRootId() {
        LOG.entering("TreeTopology", "getRootId", getQualifiedName());
        LOG.exiting("TreeTopology", "getRootId", getQualifiedName() + this.rootId);
        return this.rootId;
    }

    @Override // org.apache.reef.io.network.group.api.driver.Topology
    public boolean isRootPresent() {
        LOG.entering("TreeTopology", "isRootPresent", getQualifiedName());
        boolean z = this.root != null;
        LOG.exiting("TreeTopology", "isRootPresent", String.format("%s%s", getQualifiedName(), Boolean.valueOf(z)));
        return z;
    }

    @Override // org.apache.reef.io.network.group.api.driver.Topology
    public void setOperatorSpecification(OperatorSpec operatorSpec) {
        LOG.entering("TreeTopology", "setOperSpec", new Object[]{getQualifiedName(), operatorSpec});
        this.operatorSpec = operatorSpec;
        LOG.exiting("TreeTopology", "setOperSpec", getQualifiedName() + operatorSpec);
    }

    @Override // org.apache.reef.io.network.group.api.driver.Topology
    public Configuration getTaskConfiguration(String str) {
        LOG.entering("TreeTopology", "getTaskConfig", new Object[]{getQualifiedName(), str});
        if (this.nodes.get(str) == null) {
            throw new RuntimeException(getQualifiedName() + str + " does not exist");
        }
        int nodeVersion = getNodeVersion(str);
        JavaConfigurationBuilder newConfigurationBuilder = Tang.Factory.getTang().newConfigurationBuilder();
        newConfigurationBuilder.bindNamedParameter(DataCodec.class, this.operatorSpec.getDataCodecClass());
        newConfigurationBuilder.bindNamedParameter(TaskVersion.class, Integer.toString(nodeVersion));
        if (this.operatorSpec instanceof BroadcastOperatorSpec) {
            if (str.equals(((BroadcastOperatorSpec) this.operatorSpec).getSenderId())) {
                newConfigurationBuilder.bindImplementation(GroupCommOperator.class, BroadcastSender.class);
            } else {
                newConfigurationBuilder.bindImplementation(GroupCommOperator.class, BroadcastReceiver.class);
            }
        } else if (this.operatorSpec instanceof ReduceOperatorSpec) {
            ReduceOperatorSpec reduceOperatorSpec = (ReduceOperatorSpec) this.operatorSpec;
            newConfigurationBuilder.bindNamedParameter(ReduceFunctionParam.class, reduceOperatorSpec.getRedFuncClass());
            if (str.equals(reduceOperatorSpec.getReceiverId())) {
                newConfigurationBuilder.bindImplementation(GroupCommOperator.class, ReduceReceiver.class);
            } else {
                newConfigurationBuilder.bindImplementation(GroupCommOperator.class, ReduceSender.class);
            }
        } else if (this.operatorSpec instanceof ScatterOperatorSpec) {
            if (str.equals(((ScatterOperatorSpec) this.operatorSpec).getSenderId())) {
                newConfigurationBuilder.bindImplementation(GroupCommOperator.class, ScatterSender.class);
            } else {
                newConfigurationBuilder.bindImplementation(GroupCommOperator.class, ScatterReceiver.class);
            }
        } else if (this.operatorSpec instanceof GatherOperatorSpec) {
            if (str.equals(((GatherOperatorSpec) this.operatorSpec).getReceiverId())) {
                newConfigurationBuilder.bindImplementation(GroupCommOperator.class, GatherReceiver.class);
            } else {
                newConfigurationBuilder.bindImplementation(GroupCommOperator.class, GatherSender.class);
            }
        }
        Configuration build = newConfigurationBuilder.build();
        LOG.exiting("TreeTopology", "getTaskConfig", getQualifiedName() + this.confSer.toString(build));
        return build;
    }

    @Override // org.apache.reef.io.network.group.api.driver.Topology
    public int getNodeVersion(String str) {
        LOG.entering("TreeTopology", "getNodeVersion", new Object[]{getQualifiedName(), str});
        TaskNode taskNode = this.nodes.get(str);
        if (taskNode == null) {
            throw new RuntimeException(getQualifiedName() + str + " is not available on the nodes map");
        }
        int version = taskNode.getVersion();
        LOG.exiting("TreeTopology", "getNodeVersion", getQualifiedName() + " " + str + " " + version);
        return version;
    }

    @Override // org.apache.reef.io.network.group.api.driver.Topology
    public void removeTask(String str) {
        LOG.entering("TreeTopology", "removeTask", new Object[]{getQualifiedName(), str});
        if (!this.nodes.containsKey(str)) {
            LOG.fine("Trying to remove a non-existent node in the task graph");
            LOG.exiting("TreeTopology", "removeTask", getQualifiedName());
        } else {
            if (str.equals(this.rootId)) {
                unsetRootNode(str);
            } else {
                removeChild(str);
            }
            LOG.exiting("TreeTopology", "removeTask", getQualifiedName() + str);
        }
    }

    @Override // org.apache.reef.io.network.group.api.driver.Topology
    public void addTask(String str) {
        LOG.entering("TreeTopology", "addTask", new Object[]{getQualifiedName(), str});
        if (this.nodes.containsKey(str)) {
            LOG.fine("Got a request to add a task that is already in the graph. We need to block this request till the delete finishes. ***CAUTION***");
        }
        if (str.equals(this.rootId)) {
            setRootNode(str);
        } else {
            addChild(str);
        }
        LOG.exiting("TreeTopology", "addTask", getQualifiedName() + str);
    }

    private void addChild(String str) {
        LOG.entering("TreeTopology", "addChild", new Object[]{getQualifiedName(), str});
        LOG.finest(getQualifiedName() + "Adding leaf " + str);
        TaskNodeImpl taskNodeImpl = new TaskNodeImpl(this.senderStage, this.groupName, this.operName, str, this.driverId, false);
        if (this.logicalRoot != null) {
            addTaskNode(taskNodeImpl);
            this.prev = taskNodeImpl;
        }
        this.nodes.put(str, taskNodeImpl);
        LOG.exiting("TreeTopology", "addChild", getQualifiedName() + str);
    }

    private void addTaskNode(TaskNode taskNode) {
        LOG.entering("TreeTopology", "addTaskNode", new Object[]{getQualifiedName(), taskNode});
        if (this.logicalRoot.getNumberOfChildren() >= this.fanOut) {
            this.logicalRoot = this.logicalRoot.successor();
        }
        taskNode.setParent(this.logicalRoot);
        this.logicalRoot.addChild(taskNode);
        this.prev.setSibling(taskNode);
        LOG.exiting("TreeTopology", "addTaskNode", getQualifiedName() + taskNode);
    }

    private void removeChild(String str) {
        LOG.entering("TreeTopology", "removeChild", new Object[]{getQualifiedName(), str});
        if (this.root != null) {
            this.root.removeChild(this.nodes.get(str));
        }
        this.nodes.remove(str);
        LOG.exiting("TreeTopology", "removeChild", getQualifiedName() + str);
    }

    private void setRootNode(String str) {
        LOG.entering("TreeTopology", "setRootNode", new Object[]{getQualifiedName(), str});
        this.root = new TaskNodeImpl(this.senderStage, this.groupName, this.operName, str, this.driverId, true);
        this.logicalRoot = this.root;
        this.prev = this.root;
        Iterator<Map.Entry<String, TaskNode>> it = this.nodes.entrySet().iterator();
        while (it.hasNext()) {
            TaskNode value = it.next().getValue();
            addTaskNode(value);
            this.prev = value;
        }
        this.nodes.put(str, this.root);
        LOG.exiting("TreeTopology", "setRootNode", getQualifiedName() + str);
    }

    private void unsetRootNode(String str) {
        LOG.entering("TreeTopology", "unsetRootNode", new Object[]{getQualifiedName(), str});
        this.nodes.remove(this.rootId);
        this.root = null;
        Iterator<Map.Entry<String, TaskNode>> it = this.nodes.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().setParent(null);
        }
        LOG.exiting("TreeTopology", "unsetRootNode", getQualifiedName() + str);
    }

    @Override // org.apache.reef.io.network.group.api.driver.Topology
    public void onFailedTask(String str) {
        LOG.entering("TreeTopology", "onFailedTask", new Object[]{getQualifiedName(), str});
        TaskNode taskNode = this.nodes.get(str);
        if (taskNode == null) {
            throw new RuntimeException(getQualifiedName() + str + " does not exist");
        }
        taskNode.onFailedTask();
        LOG.exiting("TreeTopology", "onFailedTask", getQualifiedName() + str);
    }

    @Override // org.apache.reef.io.network.group.api.driver.Topology
    public void onRunningTask(String str) {
        LOG.entering("TreeTopology", "onRunningTask", new Object[]{getQualifiedName(), str});
        TaskNode taskNode = this.nodes.get(str);
        if (taskNode == null) {
            throw new RuntimeException(getQualifiedName() + str + " does not exist");
        }
        taskNode.onRunningTask();
        LOG.exiting("TreeTopology", "onRunningTask", getQualifiedName() + str);
    }

    @Override // org.apache.reef.io.network.group.api.driver.Topology
    public void onReceiptOfMessage(GroupCommunicationMessage groupCommunicationMessage) {
        LOG.entering("TreeTopology", "onReceiptOfMessage", new Object[]{getQualifiedName(), groupCommunicationMessage});
        switch (groupCommunicationMessage.getType()) {
            case TopologyChanges:
                onTopologyChanges(groupCommunicationMessage);
                break;
            case UpdateTopology:
                onUpdateTopology(groupCommunicationMessage);
                break;
            default:
                this.nodes.get(groupCommunicationMessage.getSrcid()).onReceiptOfAcknowledgement(groupCommunicationMessage);
                break;
        }
        LOG.exiting("TreeTopology", "onReceiptOfMessage", getQualifiedName() + groupCommunicationMessage);
    }

    /* JADX WARN: Type inference failed for: r8v2, types: [byte[], byte[][]] */
    private void onUpdateTopology(GroupCommunicationMessage groupCommunicationMessage) {
        LOG.entering("TreeTopology", "onUpdateTopology", new Object[]{getQualifiedName(), groupCommunicationMessage});
        LOG.fine(getQualifiedName() + "Update affected parts of Topology");
        String srcid = groupCommunicationMessage.getSrcid();
        int nodeVersion = getNodeVersion(srcid);
        LOG.finest(getQualifiedName() + "Creating NodeTopologyUpdateWaitStage to wait on nodes to be updated");
        SingleThreadStage singleThreadStage = new SingleThreadStage("NodeTopologyUpdateWaitStage", new TopologyUpdateWaitHandler(this.senderStage, this.groupName, this.operName, this.driverId, 0, srcid, nodeVersion, getQualifiedName(), TopologySerializer.encode(this.root)), this.nodes.size());
        ArrayList<TaskNode> arrayList = new ArrayList(this.nodes.size());
        LOG.finest(getQualifiedName() + "Checking which nodes need to be updated");
        for (TaskNode taskNode : this.nodes.values()) {
            if (taskNode.isRunning() && taskNode.hasChanges() && taskNode.resetTopologySetupSent()) {
                arrayList.add(taskNode);
            }
        }
        for (TaskNode taskNode2 : arrayList) {
            taskNode2.updatingTopology();
            LOG.fine(getQualifiedName() + "Asking " + taskNode2 + " to UpdateTopology");
            this.senderStage.onNext(Utils.bldVersionedGCM(this.groupName, this.operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology, this.driverId, 0, taskNode2.getTaskId(), taskNode2.getVersion(), new byte[]{Utils.EMPTY_BYTE_ARR}));
        }
        singleThreadStage.onNext(arrayList);
        LOG.exiting("TreeTopology", "onUpdateTopology", getQualifiedName() + groupCommunicationMessage);
    }

    /* JADX WARN: Type inference failed for: r8v2, types: [byte[], byte[][]] */
    private void onTopologyChanges(GroupCommunicationMessage groupCommunicationMessage) {
        LOG.entering("TreeTopology", "onTopologyChanges", new Object[]{getQualifiedName(), groupCommunicationMessage});
        LOG.fine(getQualifiedName() + "Check TopologyChanges");
        String srcid = groupCommunicationMessage.getSrcid();
        boolean z = false;
        LOG.finest(getQualifiedName() + "Checking which nodes need to be updated");
        for (TaskNode taskNode : this.nodes.values()) {
            if (!taskNode.isRunning() || taskNode.hasChanges()) {
                z = true;
                break;
            }
        }
        GroupChangesImpl groupChangesImpl = new GroupChangesImpl(z);
        GroupChangesCodec groupChangesCodec = new GroupChangesCodec();
        LOG.fine(getQualifiedName() + "TopologyChanges: " + groupChangesImpl);
        this.senderStage.onNext(Utils.bldVersionedGCM(this.groupName, this.operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyChanges, this.driverId, 0, srcid, getNodeVersion(srcid), new byte[]{groupChangesCodec.encode(groupChangesImpl)}));
        LOG.exiting("TreeTopology", "onTopologyChanges", getQualifiedName() + groupCommunicationMessage);
    }

    private String getQualifiedName() {
        return Utils.simpleName(this.groupName) + ":" + Utils.simpleName(this.operName) + " - ";
    }
}
