package org.apache.flink.runtime.io.network;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.AbstractID;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.RuntimeEnvironment;
import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
import org.apache.flink.runtime.io.network.bufferprovider.BufferProviderBroker;
import org.apache.flink.runtime.io.network.bufferprovider.DiscardBufferPool;
import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
import org.apache.flink.runtime.io.network.bufferprovider.LocalBufferPoolOwner;
import org.apache.flink.runtime.io.network.channels.Channel;
import org.apache.flink.runtime.io.network.channels.ChannelID;
import org.apache.flink.runtime.io.network.channels.ChannelType;
import org.apache.flink.runtime.io.network.channels.InputChannel;
import org.apache.flink.runtime.io.network.channels.OutputChannel;
import org.apache.flink.runtime.io.network.gates.GateID;
import org.apache.flink.runtime.io.network.gates.InputGate;
import org.apache.flink.runtime.io.network.gates.OutputGate;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.profiling.ProfilingUtils;
import org.apache.flink.runtime.protocols.ChannelLookupProtocol;
import org.apache.flink.runtime.taskmanager.Task;

/* loaded from: input_file:org/apache/flink/runtime/io/network/ChannelManager.class */
public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker {
    private static final Log LOG = LogFactory.getLog(ChannelManager.class);
    private final ChannelLookupProtocol channelLookupService;
    private final InstanceConnectionInfo connectionInfo;
    private final Map<ChannelID, Channel> channels;
    private final Map<AbstractID, LocalBufferPoolOwner> localBuffersPools;
    private final Map<ChannelID, EnvelopeReceiverList> receiverCache;
    private final GlobalBufferPool globalBufferPool;
    private final NetworkConnectionManager networkConnectionManager;
    private final InetSocketAddress ourAddress;
    private final DiscardBufferPool discardBufferPool;

    /* renamed from: org.apache.flink.runtime.io.network.ChannelManager$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/ChannelManager$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$io$network$channels$ChannelType = new int[ChannelType.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$runtime$io$network$channels$ChannelType[ChannelType.IN_MEMORY.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$io$network$channels$ChannelType[ChannelType.NETWORK.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public ChannelManager(ChannelLookupProtocol channelLookupProtocol, InstanceConnectionInfo instanceConnectionInfo, int i, int i2, NetworkConnectionManager networkConnectionManager) throws IOException {
        this.channelLookupService = channelLookupProtocol;
        this.connectionInfo = instanceConnectionInfo;
        try {
            this.globalBufferPool = new GlobalBufferPool(i, i2);
            this.networkConnectionManager = networkConnectionManager;
            networkConnectionManager.start(this);
            this.channels = new ConcurrentHashMap();
            this.receiverCache = new ConcurrentHashMap();
            this.localBuffersPools = new ConcurrentHashMap();
            this.ourAddress = new InetSocketAddress(instanceConnectionInfo.address(), instanceConnectionInfo.dataPort());
            this.discardBufferPool = new DiscardBufferPool();
        } catch (Throwable th) {
            throw new IOException("Failed to instantiate GlobalBufferPool.", th);
        }
    }

    public void shutdown() throws IOException {
        this.networkConnectionManager.shutdown();
        this.globalBufferPool.destroy();
    }

    public void register(Task task) throws InsufficientResourcesException {
        ensureBufferAvailability(task);
        RuntimeEnvironment runtimeEnvironment = task.getRuntimeEnvironment();
        runtimeEnvironment.registerGlobalBufferPool(this.globalBufferPool);
        if (this.localBuffersPools.containsKey(task.getVertexID())) {
            throw new IllegalStateException("Vertex " + task.getVertexID() + " has a previous buffer pool owner");
        }
        Iterator<OutputGate> it = runtimeEnvironment.outputGates().iterator();
        while (it.hasNext()) {
            for (OutputChannel outputChannel : it.next().channels()) {
                outputChannel.registerEnvelopeDispatcher(this);
                switch (AnonymousClass1.$SwitchMap$org$apache$flink$runtime$io$network$channels$ChannelType[outputChannel.getChannelType().ordinal()]) {
                    case 1:
                        addReceiverListHint(outputChannel.getID(), outputChannel.getConnectedId());
                        break;
                    case ProfilingUtils.DEFAULT_TASKMANAGER_REPORTINTERVAL /* 2 */:
                        addReceiverListHint(outputChannel.getConnectedId(), outputChannel.getID());
                        break;
                }
                this.channels.put(outputChannel.getID(), outputChannel);
            }
        }
        this.localBuffersPools.put(task.getVertexID(), runtimeEnvironment);
        for (InputGate<? extends IOReadableWritable> inputGate : runtimeEnvironment.inputGates()) {
            inputGate.registerGlobalBufferPool(this.globalBufferPool);
            for (int i = 0; i < inputGate.getNumberOfInputChannels(); i++) {
                InputChannel<? extends IOReadableWritable> inputChannel = inputGate.getInputChannel(i);
                inputChannel.registerEnvelopeDispatcher(this);
                if (inputChannel.getChannelType() == ChannelType.IN_MEMORY) {
                    addReceiverListHint(inputChannel.getID(), inputChannel.getConnectedId());
                }
                this.channels.put(inputChannel.getID(), inputChannel);
            }
            this.localBuffersPools.put(inputGate.getGateID(), inputGate);
        }
        redistributeBuffers();
    }

    public void unregister(ExecutionVertexID executionVertexID, Task task) {
        Environment environment = task.getEnvironment();
        Iterator<ChannelID> it = environment.getOutputChannelIDs().iterator();
        while (it.hasNext()) {
            Channel remove = this.channels.remove(it.next());
            if (remove != null) {
                remove.destroy();
            }
            this.receiverCache.remove(remove);
        }
        Iterator<ChannelID> it2 = environment.getInputChannelIDs().iterator();
        while (it2.hasNext()) {
            Channel remove2 = this.channels.remove(it2.next());
            if (remove2 != null) {
                remove2.destroy();
            }
            this.receiverCache.remove(remove2);
        }
        Iterator<GateID> it3 = environment.getInputGateIDs().iterator();
        while (it3.hasNext()) {
            LocalBufferPoolOwner remove3 = this.localBuffersPools.remove(it3.next());
            if (remove3 != null) {
                remove3.clearLocalBufferPool();
            }
        }
        LocalBufferPoolOwner remove4 = this.localBuffersPools.remove(executionVertexID);
        if (remove4 != null) {
            remove4.clearLocalBufferPool();
        }
        redistributeBuffers();
    }

    private void ensureBufferAvailability(Task task) throws InsufficientResourcesException {
        Environment environment = task.getEnvironment();
        int numBuffers = this.globalBufferPool.numBuffers();
        int size = this.channels.size() + environment.getNumberOfOutputChannels() + environment.getNumberOfInputChannels();
        if (numBuffers / size < 1) {
            throw new InsufficientResourcesException(String.format("%s has not enough buffers to safely execute %s (%d buffers missing)", this.connectionInfo.hostname(), environment.getTaskName(), Integer.valueOf(size - numBuffers)));
        }
    }

    private void redistributeBuffers() {
        if (this.localBuffersPools.isEmpty() || (this.channels.size() == 0)) {
            return;
        }
        double numBuffers = this.globalBufferPool.numBuffers() / this.channels.size();
        if (numBuffers < 1.0d) {
            throw new RuntimeException("System has not enough buffers to execute tasks.");
        }
        Iterator<LocalBufferPoolOwner> it = this.localBuffersPools.values().iterator();
        while (it.hasNext()) {
            it.next().setDesignatedNumberOfBuffers((int) Math.ceil(numBuffers * r0.getNumberOfChannels()));
        }
    }

    private void releaseEnvelope(Envelope envelope) {
        Buffer buffer = envelope.getBuffer();
        if (buffer != null) {
            buffer.recycleBuffer();
        }
    }

    private void addReceiverListHint(ChannelID channelID, ChannelID channelID2) {
        if (this.receiverCache.put(channelID, new EnvelopeReceiverList(channelID2)) != null) {
            LOG.warn("Receiver cache already contained entry for " + channelID);
        }
    }

    private void addReceiverListHint(ChannelID channelID, RemoteReceiver remoteReceiver) {
        if (this.receiverCache.put(channelID, new EnvelopeReceiverList(remoteReceiver)) != null) {
            LOG.warn("Receiver cache already contained entry for " + channelID);
        }
    }

    private void generateSenderHint(Envelope envelope, RemoteReceiver remoteReceiver) throws IOException {
        Channel channel = this.channels.get(envelope.getSource());
        if (channel == null) {
            LOG.error("Cannot find channel for channel ID " + envelope.getSource());
        } else {
            if (channel.isInputChannel()) {
                return;
            }
            this.networkConnectionManager.enqueue(SenderHintEvent.createEnvelopeWithEvent(envelope, channel.getConnectedId(), new RemoteReceiver(this.ourAddress, remoteReceiver.getConnectionIndex())), remoteReceiver);
        }
    }

    private EnvelopeReceiverList getReceiverList(JobID jobID, ChannelID channelID, boolean z) throws IOException {
        ConnectionInfoLookupResponse lookupConnectionInfo;
        EnvelopeReceiverList envelopeReceiverList = this.receiverCache.get(channelID);
        if (envelopeReceiverList != null) {
            return envelopeReceiverList;
        }
        while (true) {
            synchronized (this.channelLookupService) {
                lookupConnectionInfo = this.channelLookupService.lookupConnectionInfo(this.connectionInfo, jobID, channelID);
            }
            if (lookupConnectionInfo.receiverReady()) {
                EnvelopeReceiverList envelopeReceiverList2 = new EnvelopeReceiverList(lookupConnectionInfo);
                this.receiverCache.put(channelID, envelopeReceiverList2);
                if (LOG.isDebugEnabled()) {
                    Log log = LOG;
                    Object[] objArr = new Object[3];
                    objArr[0] = channelID;
                    objArr[1] = envelopeReceiverList2.hasLocalReceiver() ? envelopeReceiverList2.getLocalReceiver() : envelopeReceiverList2.getRemoteReceiver();
                    objArr[2] = envelopeReceiverList2.hasLocalReceiver() ? "local" : "remote";
                    log.debug(String.format("Receiver for %s: %s [%s])", objArr));
                }
                return envelopeReceiverList2;
            }
            if (!lookupConnectionInfo.receiverNotReady()) {
                if (lookupConnectionInfo.isJobAborting()) {
                    if (z) {
                        throw new CancelTaskException();
                    }
                    return null;
                }
                if (!lookupConnectionInfo.receiverNotFound()) {
                    throw new IllegalStateException("Unrecognized response to channel lookup.");
                }
                if (z) {
                    throw new IOException("Could not find the receiver for Job " + jobID + ", channel with source id " + channelID);
                }
                return null;
            }
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e) {
                if (z) {
                    throw new IOException("Lookup was interrupted.");
                }
                return null;
            }
        }
    }

    public void invalidateLookupCacheEntries(Set<ChannelID> set) {
        Iterator<ChannelID> it = set.iterator();
        while (it.hasNext()) {
            this.receiverCache.remove(it.next());
        }
    }

    @Override // org.apache.flink.runtime.io.network.EnvelopeDispatcher
    public void dispatchFromOutputChannel(Envelope envelope) throws IOException, InterruptedException {
        EnvelopeReceiverList receiverListForEnvelope = getReceiverListForEnvelope(envelope, true);
        Buffer buffer = envelope.getBuffer();
        Buffer buffer2 = null;
        boolean z = false;
        try {
            if (receiverListForEnvelope.hasLocalReceiver()) {
                ChannelID localReceiver = receiverListForEnvelope.getLocalReceiver();
                Channel channel = this.channels.get(localReceiver);
                if (channel == null) {
                    throw new LocalReceiverCancelledException(localReceiver);
                }
                if (!channel.isInputChannel()) {
                    throw new IOException("Local receiver " + localReceiver + " is not an input channel.");
                }
                InputChannel inputChannel = (InputChannel) channel;
                if (buffer != null) {
                    try {
                        buffer2 = inputChannel.requestBufferBlocking(buffer.size());
                        buffer.copyToBuffer(buffer2);
                        envelope.setBuffer(buffer2);
                        buffer.recycleBuffer();
                    } catch (InterruptedException e) {
                        throw new IOException(e.getMessage());
                    }
                }
                inputChannel.queueEnvelope(envelope);
                z = true;
            } else if (receiverListForEnvelope.hasRemoteReceiver()) {
                RemoteReceiver remoteReceiver = receiverListForEnvelope.getRemoteReceiver();
                if (envelope.getSequenceNumber() == 0) {
                    generateSenderHint(envelope, remoteReceiver);
                }
                this.networkConnectionManager.enqueue(envelope, remoteReceiver);
                z = true;
            }
            z = z;
            if (z) {
                return;
            }
        } finally {
            if (0 == 0) {
                if (buffer != null) {
                    buffer.recycleBuffer();
                }
                if (buffer2 != null) {
                    buffer2.recycleBuffer();
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.EnvelopeDispatcher
    public void dispatchFromInputChannel(Envelope envelope) throws IOException, InterruptedException {
        if (envelope.getBuffer() != null) {
            throw new RuntimeException("Error: This method can only process envelopes without buffers.");
        }
        EnvelopeReceiverList receiverListForEnvelope = getReceiverListForEnvelope(envelope, true);
        if (!receiverListForEnvelope.hasLocalReceiver()) {
            if (receiverListForEnvelope.hasRemoteReceiver()) {
                RemoteReceiver remoteReceiver = receiverListForEnvelope.getRemoteReceiver();
                if (envelope.getSequenceNumber() == 0) {
                    generateSenderHint(envelope, remoteReceiver);
                }
                this.networkConnectionManager.enqueue(envelope, remoteReceiver);
                return;
            }
            return;
        }
        ChannelID localReceiver = receiverListForEnvelope.getLocalReceiver();
        Channel channel = this.channels.get(localReceiver);
        if (channel == null) {
            throw new LocalReceiverCancelledException(localReceiver);
        }
        if (channel.isInputChannel()) {
            throw new IOException("Local receiver " + localReceiver + " of backward event is not an output channel.");
        }
        ((OutputChannel) channel).queueEnvelope(envelope);
    }

    @Override // org.apache.flink.runtime.io.network.EnvelopeDispatcher
    public void dispatchFromNetwork(Envelope envelope) throws IOException, InterruptedException {
        if (SenderHintEvent.isSenderHintEvent(envelope)) {
            SenderHintEvent senderHintEvent = (SenderHintEvent) envelope.deserializeEvents().get(0);
            if (this.channels.get(senderHintEvent.getSource()) != null) {
                addReceiverListHint(senderHintEvent.getSource(), senderHintEvent.getRemoteReceiver());
                return;
            }
        }
        EnvelopeReceiverList receiverListForEnvelope = getReceiverListForEnvelope(envelope, false);
        if (receiverListForEnvelope == null) {
            releaseEnvelope(envelope);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Dropping envelope for cleaned up receiver.");
                return;
            }
            return;
        }
        if (!receiverListForEnvelope.hasLocalReceiver() || receiverListForEnvelope.hasRemoteReceiver()) {
            throw new IOException("Bug in network stack: Envelope dispatched from the incoming network pipe has no local receiver or has a remote receiver");
        }
        ChannelID localReceiver = receiverListForEnvelope.getLocalReceiver();
        Channel channel = this.channels.get(localReceiver);
        if (channel != null) {
            channel.queueEnvelope(envelope);
            return;
        }
        releaseEnvelope(envelope);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Dropping envelope for cancelled receiver " + localReceiver);
        }
    }

    private final EnvelopeReceiverList getReceiverListForEnvelope(Envelope envelope, boolean z) throws IOException {
        try {
            return getReceiverList(envelope.getJobID(), envelope.getSource(), z);
        } catch (IOException e) {
            releaseEnvelope(envelope);
            throw e;
        } catch (CancelTaskException e2) {
            releaseEnvelope(envelope);
            throw e2;
        }
    }

    @Override // org.apache.flink.runtime.io.network.bufferprovider.BufferProviderBroker
    public BufferProvider getBufferProvider(JobID jobID, ChannelID channelID) throws IOException {
        EnvelopeReceiverList receiverList = getReceiverList(jobID, channelID, false);
        if (receiverList == null) {
            return this.discardBufferPool;
        }
        if (!receiverList.hasLocalReceiver() || receiverList.hasRemoteReceiver()) {
            throw new IOException("The destination to be looked up is not a single local endpoint.");
        }
        ChannelID localReceiver = receiverList.getLocalReceiver();
        Channel channel = this.channels.get(localReceiver);
        if (channel == null) {
            return this.discardBufferPool;
        }
        if (channel.isInputChannel()) {
            return (InputChannel) channel;
        }
        throw new IOException("Channel context for local receiver " + localReceiver + " is not an input channel context");
    }

    public void logBufferUtilization() {
        System.out.println("Buffer utilization at " + System.currentTimeMillis());
        System.out.println("\tUnused global buffers: " + this.globalBufferPool.numAvailableBuffers());
        System.out.println("\tLocal buffer pool status:");
        Iterator<LocalBufferPoolOwner> it = this.localBuffersPools.values().iterator();
        while (it.hasNext()) {
            it.next().logBufferUtilization();
        }
        System.out.println("\tIncoming connections:");
        for (Channel channel : this.channels.values()) {
            if (channel.isInputChannel()) {
                ((InputChannel) channel).logQueuedEnvelopes();
            }
        }
    }
}
