package org.apache.reef.io.network.group.impl.driver;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import org.apache.reef.io.network.group.api.driver.TaskNode;
import org.apache.reef.io.network.group.api.driver.TaskNodeStatus;
import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
import org.apache.reef.io.network.group.impl.utils.ConcurrentCountingMap;
import org.apache.reef.io.network.group.impl.utils.CountingMap;
import org.apache.reef.io.network.group.impl.utils.Utils;
import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
import org.apache.reef.tang.annotations.Name;

/* loaded from: input_file:org/apache/reef/io/network/group/impl/driver/TaskNodeStatusImpl.class */
public class TaskNodeStatusImpl implements TaskNodeStatus {
    private static final Logger LOG = Logger.getLogger(TaskNodeStatusImpl.class.getName());
    private final Class<? extends Name<String>> groupName;
    private final Class<? extends Name<String>> operName;
    private final String taskId;
    private final TaskNode node;
    private final ConcurrentCountingMap<ReefNetworkGroupCommProtos.GroupCommMessage.Type, String> statusMap = new ConcurrentCountingMap<>();
    private final Set<String> activeNeighbors = new HashSet();
    private final CountingMap<String> neighborStatus = new CountingMap<>();
    private final AtomicBoolean updatingTopo = new AtomicBoolean(false);
    private final Object topoUpdateStageLock = new Object();
    private final Object topoSetupSentLock = new Object();

    public TaskNodeStatusImpl(Class<? extends Name<String>> cls, Class<? extends Name<String>> cls2, String str, TaskNode taskNode) {
        this.groupName = cls;
        this.operName = cls2;
        this.taskId = str;
        this.node = taskNode;
    }

    private boolean isDeadMsg(ReefNetworkGroupCommProtos.GroupCommMessage.Type type) {
        return type == ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentDead || type == ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildDead;
    }

    private boolean isAddMsg(ReefNetworkGroupCommProtos.GroupCommMessage.Type type) {
        return type == ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentAdd || type == ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd;
    }

    private ReefNetworkGroupCommProtos.GroupCommMessage.Type getAckedMsg(ReefNetworkGroupCommProtos.GroupCommMessage.Type type) {
        switch (type) {
            case ParentAdded:
                return ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentAdd;
            case ChildAdded:
                return ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd;
            case ParentRemoved:
                return ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentDead;
            case ChildRemoved:
                return ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildDead;
            default:
                return type;
        }
    }

    private void chkIamActiveToSendTopoSetup(ReefNetworkGroupCommProtos.GroupCommMessage.Type type) {
        LOG.entering("TaskNodeStatusImpl", "chkAndSendTopoSetup", new Object[]{getQualifiedName(), type});
        if (this.statusMap.isEmpty()) {
            LOG.finest(getQualifiedName() + "Empty status map.");
            this.node.checkAndSendTopologySetupMessage();
        } else {
            LOG.finest(getQualifiedName() + "Status map non-empty" + this.statusMap);
        }
        LOG.exiting("TaskNodeStatusImpl", "chkAndSendTopoSetup", getQualifiedName() + type);
    }

    @Override // org.apache.reef.io.network.group.api.driver.TaskNodeStatus
    public void onTopologySetupMessageSent() {
        LOG.entering("TaskNodeStatusImpl", "onTopologySetupMessageSent", getQualifiedName());
        this.neighborStatus.clear();
        synchronized (this.topoSetupSentLock) {
            this.topoSetupSentLock.notifyAll();
        }
        LOG.exiting("TaskNodeStatusImpl", "onTopologySetupMessageSent", getQualifiedName());
    }

    @Override // org.apache.reef.io.network.group.api.driver.TaskNodeStatus
    public boolean isActive(String str) {
        LOG.entering("TaskNodeStatusImpl", "isActive", new Object[]{getQualifiedName(), str});
        boolean contains = this.activeNeighbors.contains(str);
        LOG.exiting("TaskNodeStatusImpl", "isActive", getQualifiedName() + contains);
        return contains;
    }

    @Override // org.apache.reef.io.network.group.api.driver.TaskNodeStatus
    public void expectAckFor(ReefNetworkGroupCommProtos.GroupCommMessage.Type type, String str) {
        LOG.entering("TaskNodeStatusImpl", "expectAckFor", new Object[]{getQualifiedName(), type, str});
        LOG.finest(getQualifiedName() + "Adding " + str + " to sources");
        this.statusMap.add(type, str);
        LOG.exiting("TaskNodeStatusImpl", "expectAckFor", getQualifiedName() + "Sources from which ACKs for " + type + " are expected: " + this.statusMap.get(type));
    }

    @Override // org.apache.reef.io.network.group.api.driver.TaskNodeStatus
    public void clearStateAndReleaseLocks() {
        LOG.entering("TaskNodeStatusImpl", "clearStateAndReleaseLocks", getQualifiedName());
        this.statusMap.clear();
        this.activeNeighbors.clear();
        this.neighborStatus.clear();
        this.updatingTopo.compareAndSet(true, false);
        synchronized (this.topoSetupSentLock) {
            this.topoSetupSentLock.notifyAll();
        }
        synchronized (this.topoUpdateStageLock) {
            this.topoUpdateStageLock.notifyAll();
        }
        LOG.exiting("TaskNodeStatusImpl", "clearStateAndReleaseLocks", getQualifiedName());
    }

    @Override // org.apache.reef.io.network.group.api.driver.TaskNodeStatus
    public void updateFailureOf(String str) {
        LOG.entering("TaskNodeStatusImpl", "updateFailureOf", new Object[]{getQualifiedName(), str});
        this.activeNeighbors.remove(str);
        this.neighborStatus.remove(str);
        LOG.exiting("TaskNodeStatusImpl", "updateFailureOf", getQualifiedName());
    }

    @Override // org.apache.reef.io.network.group.api.driver.TaskNodeStatus
    public void processAcknowledgement(GroupCommunicationMessage groupCommunicationMessage) {
        LOG.entering("TaskNodeStatusImpl", "processMsg", new Object[]{getQualifiedName(), groupCommunicationMessage});
        ReefNetworkGroupCommProtos.GroupCommMessage.Type type = groupCommunicationMessage.getType();
        ReefNetworkGroupCommProtos.GroupCommMessage.Type ackedMsg = getAckedMsg(type);
        String destid = groupCommunicationMessage.getDestid();
        switch (type) {
            case ParentAdded:
            case ChildAdded:
            case ParentRemoved:
            case ChildRemoved:
                processNeighborAcks(groupCommunicationMessage, type, ackedMsg, destid);
                break;
            case TopologySetup:
                synchronized (this.topoUpdateStageLock) {
                    if (!this.updatingTopo.compareAndSet(true, false)) {
                        LOG.fine(getQualifiedName() + "Was expecting updateTopo to be true but it was false");
                    }
                    this.topoUpdateStageLock.notifyAll();
                }
                break;
            default:
                LOG.fine(getQualifiedName() + "Non ACK msg " + groupCommunicationMessage.getType() + " for " + groupCommunicationMessage.getDestid() + " unexpected");
                break;
        }
        LOG.exiting("TaskNodeStatusImpl", "processMsg", getQualifiedName());
    }

    private void processNeighborAcks(GroupCommunicationMessage groupCommunicationMessage, ReefNetworkGroupCommProtos.GroupCommMessage.Type type, ReefNetworkGroupCommProtos.GroupCommMessage.Type type2, String str) {
        LOG.entering("TaskNodeStatusImpl", "processNeighborAcks", getQualifiedName() + groupCommunicationMessage);
        if (!this.statusMap.containsKey(type2)) {
            LOG.fine(getQualifiedName() + "NodeStatusMsgProcessorStage There were no " + type2 + " msgs sent in the previous update cycle. Perhaps reset during failure. If context not indicative use ***CAUTION***");
        } else if (this.statusMap.contains(type2, str)) {
            this.statusMap.remove(type2, str);
            updateNeighborStatus(type2, str);
            checkNeighborActiveToSendTopoSetup(str);
            chkIamActiveToSendTopoSetup(type2);
        } else {
            LOG.fine(getQualifiedName() + "NodeStatusMsgProcessorStage Got " + type + " from a source(" + str + ") to whom ChildAdd was not sent. Perhaps reset during failure. If context not indicative use ***CAUTION***");
        }
        LOG.exiting("TaskNodeStatusImpl", "processNeighborAcks", getQualifiedName() + groupCommunicationMessage);
    }

    private void checkNeighborActiveToSendTopoSetup(String str) {
        LOG.entering("TaskNodeStatusImpl", "checkNeighborActiveToSendTopoSetup", new Object[]{getQualifiedName(), str});
        if (!this.statusMap.notContains(str)) {
            LOG.finest(getQualifiedName() + "Not done processing " + str + " acks yet. So it is still inactive");
        } else if (this.neighborStatus.get(str) > 0) {
            this.activeNeighbors.add(str);
            this.node.checkAndSendTopologySetupMessageFor(str);
        } else {
            LOG.finest(getQualifiedName() + str + " is not a neighbor anymore");
        }
        LOG.exiting("TaskNodeStatusImpl", "checkNeighborActiveToSendTopoSetup", getQualifiedName() + str);
    }

    private void updateNeighborStatus(ReefNetworkGroupCommProtos.GroupCommMessage.Type type, String str) {
        LOG.entering("TaskNodeStatusImpl", "updateNeighborStatus", new Object[]{getQualifiedName(), type, str});
        if (isAddMsg(type)) {
            this.neighborStatus.add(str);
        } else {
            if (!isDeadMsg(type)) {
                throw new RuntimeException("Can only deal with Neigbor ACKs while I received " + type + " from " + str);
            }
            this.neighborStatus.remove(str);
        }
        LOG.exiting("TaskNodeStatusImpl", "updateNeighborStatus", new Object[]{getQualifiedName(), type, str});
    }

    @Override // org.apache.reef.io.network.group.api.driver.TaskNodeStatus
    public void updatingTopology() {
        LOG.entering("TaskNodeStatusImpl", "updatingTopology", getQualifiedName());
        if (!this.updatingTopo.compareAndSet(false, true)) {
            throw new RuntimeException(getQualifiedName() + "Was expecting updateTopo to be false but it was true");
        }
        LOG.exiting("TaskNodeStatusImpl", "updatingTopology", getQualifiedName());
    }

    private String getQualifiedName() {
        return Utils.simpleName(this.groupName) + ":" + Utils.simpleName(this.operName) + ":(" + this.taskId + "," + this.node.getVersion() + ") - ";
    }

    @Override // org.apache.reef.io.network.group.api.driver.TaskNodeStatus
    public boolean hasChanges() {
        LOG.entering("TaskNodeStatusImpl", "hasChanges", getQualifiedName());
        boolean z = !this.statusMap.isEmpty();
        LOG.exiting("TaskNodeStatusImpl", "hasChanges", getQualifiedName() + z);
        return z;
    }

    @Override // org.apache.reef.io.network.group.api.driver.TaskNodeStatus
    public void waitForTopologySetup() {
        LOG.entering("TaskNodeStatusImpl", "waitForTopologySetup", getQualifiedName());
        LOG.finest("Waiting to acquire topoUpdateStageLock");
        synchronized (this.topoUpdateStageLock) {
            LOG.finest(getQualifiedName() + "Acquired topoUpdateStageLock. updatingTopo: " + this.updatingTopo.get());
            while (this.updatingTopo.get() && this.node.isRunning()) {
                try {
                    LOG.finest(getQualifiedName() + "Waiting on topoUpdateStageLock");
                    this.topoUpdateStageLock.wait();
                } catch (InterruptedException e) {
                    throw new RuntimeException("InterruptedException in NodeTopologyUpdateWaitStage while waiting for receiving TopologySetup", e);
                }
            }
        }
    }
}
