/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.runtime.AbstractID;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.RuntimeEnvironment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.io.network.Buffer;
import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse;
import org.apache.flink.runtime.io.network.Envelope;
import org.apache.flink.runtime.io.network.EnvelopeDispatcher;
import org.apache.flink.runtime.io.network.EnvelopeReceiverList;
import org.apache.flink.runtime.io.network.InsufficientResourcesException;
import org.apache.flink.runtime.io.network.LocalReceiverCancelledException;
import org.apache.flink.runtime.io.network.NetworkConnectionManager;
import org.apache.flink.runtime.io.network.RemoteReceiver;
import org.apache.flink.runtime.io.network.SenderHintEvent;
import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
import org.apache.flink.runtime.io.network.bufferprovider.BufferProviderBroker;
import org.apache.flink.runtime.io.network.bufferprovider.DiscardBufferPool;
import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
import org.apache.flink.runtime.io.network.bufferprovider.LocalBufferPoolOwner;
import org.apache.flink.runtime.io.network.channels.Channel;
import org.apache.flink.runtime.io.network.channels.ChannelID;
import org.apache.flink.runtime.io.network.channels.ChannelType;
import org.apache.flink.runtime.io.network.channels.InputChannel;
import org.apache.flink.runtime.io.network.channels.OutputChannel;
import org.apache.flink.runtime.io.network.gates.GateID;
import org.apache.flink.runtime.io.network.gates.InputGate;
import org.apache.flink.runtime.io.network.gates.OutputGate;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.protocols.ChannelLookupProtocol;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ChannelManager
implements EnvelopeDispatcher,
BufferProviderBroker {
    private static final Logger LOG = LoggerFactory.getLogger(ChannelManager.class);
    private final ChannelLookupProtocol channelLookupService;
    private final InstanceConnectionInfo connectionInfo;
    private final Map<ChannelID, Channel> channels;
    private final Map<AbstractID, LocalBufferPoolOwner> localBuffersPools;
    private final Map<ChannelID, EnvelopeReceiverList> receiverCache;
    private final GlobalBufferPool globalBufferPool;
    private final NetworkConnectionManager networkConnectionManager;
    private final InetSocketAddress ourAddress;
    private final DiscardBufferPool discardBufferPool;

    public ChannelManager(ChannelLookupProtocol channelLookupService, InstanceConnectionInfo connectionInfo, int numNetworkBuffers, int networkBufferSize, NetworkConnectionManager networkConnectionManager) throws IOException {
        this.channelLookupService = channelLookupService;
        this.connectionInfo = connectionInfo;
        try {
            this.globalBufferPool = new GlobalBufferPool(numNetworkBuffers, networkBufferSize);
        }
        catch (Throwable e) {
            throw new IOException("Failed to instantiate GlobalBufferPool.", e);
        }
        this.networkConnectionManager = networkConnectionManager;
        networkConnectionManager.start(this);
        this.channels = new ConcurrentHashMap<ChannelID, Channel>();
        this.receiverCache = new ConcurrentHashMap<ChannelID, EnvelopeReceiverList>();
        this.localBuffersPools = new ConcurrentHashMap<AbstractID, LocalBufferPoolOwner>();
        this.ourAddress = new InetSocketAddress(connectionInfo.address(), connectionInfo.dataPort());
        this.discardBufferPool = new DiscardBufferPool();
    }

    public void shutdown() throws IOException {
        this.networkConnectionManager.shutdown();
        this.globalBufferPool.destroy();
    }

    public GlobalBufferPool getGlobalBufferPool() {
        return this.globalBufferPool;
    }

    public void register(Task task) throws InsufficientResourcesException {
        this.ensureBufferAvailability(task);
        RuntimeEnvironment environment = task.getEnvironment();
        environment.registerGlobalBufferPool(this.globalBufferPool);
        if (this.localBuffersPools.containsKey(task.getExecutionId())) {
            throw new IllegalStateException("Execution " + task.getExecutionId() + " has a previous buffer pool owner");
        }
        for (OutputGate outputGate : environment.outputGates()) {
            for (OutputChannel channel : outputGate.channels()) {
                channel.registerEnvelopeDispatcher(this);
                switch (channel.getChannelType()) {
                    case IN_MEMORY: {
                        this.addReceiverListHint(channel.getID(), channel.getConnectedId());
                        break;
                    }
                    case NETWORK: {
                        this.addReceiverListHint(channel.getConnectedId(), channel.getID());
                    }
                }
                this.channels.put(channel.getID(), channel);
            }
        }
        this.localBuffersPools.put(task.getExecutionId(), environment);
        for (InputGate inputGate : environment.inputGates()) {
            inputGate.registerGlobalBufferPool(this.globalBufferPool);
            for (int i = 0; i < inputGate.getNumberOfInputChannels(); ++i) {
                InputChannel channel = inputGate.getInputChannel(i);
                channel.registerEnvelopeDispatcher(this);
                if (channel.getChannelType() == ChannelType.IN_MEMORY) {
                    this.addReceiverListHint(channel.getID(), channel.getConnectedId());
                }
                this.channels.put(channel.getID(), channel);
            }
            this.localBuffersPools.put(inputGate.getGateID(), inputGate);
        }
        this.redistributeBuffers();
    }

    public void unregister(ExecutionAttemptID executionId, Task task) {
        Channel channel;
        RuntimeEnvironment environment = task.getEnvironment();
        if (environment == null) {
            return;
        }
        for (ChannelID channelID : environment.getOutputChannelIDs()) {
            channel = this.channels.remove(channelID);
            if (channel == null) continue;
            channel.destroy();
            this.receiverCache.remove(channel);
        }
        for (ChannelID channelID : environment.getInputChannelIDs()) {
            channel = this.channels.remove(channelID);
            if (channel == null) continue;
            channel.destroy();
            this.receiverCache.remove(channel);
        }
        for (GateID gateID : environment.getInputGateIDs()) {
            LocalBufferPoolOwner bufferPool = this.localBuffersPools.remove(gateID);
            if (bufferPool == null) continue;
            bufferPool.clearLocalBufferPool();
        }
        LocalBufferPoolOwner bufferPool = this.localBuffersPools.remove(executionId);
        if (bufferPool != null) {
            bufferPool.clearLocalBufferPool();
        }
        this.redistributeBuffers();
    }

    private void ensureBufferAvailability(Task task) throws InsufficientResourcesException {
        RuntimeEnvironment env = task.getEnvironment();
        int numBuffers = this.globalBufferPool.numBuffers();
        int numChannels = this.channels.size() + env.getNumberOfOutputChannels() + env.getNumberOfInputChannels();
        if (numChannels > 0 && numBuffers / numChannels < 1) {
            String msg = String.format("%s has not enough buffers to safely execute %s (%d buffers missing)", this.connectionInfo.getFQDNHostname(), env.getTaskName(), numChannels - numBuffers);
            throw new InsufficientResourcesException(msg);
        }
    }

    private void redistributeBuffers() {
        int numChannels;
        if (this.localBuffersPools.isEmpty() | this.channels.size() == 0) {
            return;
        }
        int numBuffers = this.globalBufferPool.numBuffers();
        double buffersPerChannel = (double)numBuffers / (double)(numChannels = this.channels.size());
        if (buffersPerChannel < 1.0) {
            throw new RuntimeException("System has not enough buffers to execute tasks.");
        }
        for (LocalBufferPoolOwner bufferPool : this.localBuffersPools.values()) {
            int numDesignatedBuffers = (int)Math.ceil(buffersPerChannel * (double)bufferPool.getNumberOfChannels());
            bufferPool.setDesignatedNumberOfBuffers(numDesignatedBuffers);
        }
    }

    private void releaseEnvelope(Envelope envelope) {
        Buffer buffer = envelope.getBuffer();
        if (buffer != null) {
            buffer.recycleBuffer();
        }
    }

    private void addReceiverListHint(ChannelID source, ChannelID localReceiver) {
        EnvelopeReceiverList receiverList = new EnvelopeReceiverList(localReceiver);
        if (this.receiverCache.put(source, receiverList) != null) {
            LOG.warn("Receiver cache already contained entry for " + source);
        }
    }

    private void addReceiverListHint(ChannelID source, RemoteReceiver remoteReceiver) {
        EnvelopeReceiverList receiverList = new EnvelopeReceiverList(remoteReceiver);
        if (this.receiverCache.put(source, receiverList) != null) {
            LOG.warn("Receiver cache already contained entry for " + source);
        }
    }

    private void generateSenderHint(Envelope envelope, RemoteReceiver receiver) throws IOException {
        Channel channel = this.channels.get(envelope.getSource());
        if (channel == null) {
            LOG.error("Cannot find channel for channel ID " + envelope.getSource());
            return;
        }
        if (channel.isInputChannel()) {
            return;
        }
        ChannelID targetChannelID = channel.getConnectedId();
        int connectionIndex = receiver.getConnectionIndex();
        RemoteReceiver ourAddress = new RemoteReceiver(this.ourAddress, connectionIndex);
        Envelope senderHint = SenderHintEvent.createEnvelopeWithEvent(envelope, targetChannelID, ourAddress);
        this.networkConnectionManager.enqueue(senderHint, receiver);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private EnvelopeReceiverList getReceiverList(JobID jobID, ChannelID sourceChannelID, boolean reportException) throws IOException {
        ConnectionInfoLookupResponse lookupResponse;
        EnvelopeReceiverList receiverList;
        block13: {
            receiverList = this.receiverCache.get(sourceChannelID);
            if (receiverList != null) {
                return receiverList;
            }
            while (true) {
                ChannelLookupProtocol channelLookupProtocol = this.channelLookupService;
                synchronized (channelLookupProtocol) {
                    lookupResponse = this.channelLookupService.lookupConnectionInfo(this.connectionInfo, jobID, sourceChannelID);
                }
                if (lookupResponse.receiverReady()) break block13;
                if (!lookupResponse.receiverNotReady()) break;
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException e) {
                    if (reportException) {
                        throw new IOException("Lookup was interrupted.");
                    }
                    return null;
                }
            }
            if (lookupResponse.isJobAborting()) {
                if (reportException) {
                    throw new CancelTaskException();
                }
                return null;
            }
            if (lookupResponse.receiverNotFound()) {
                if (reportException) {
                    throw new IOException("Could not find the receiver for Job " + jobID + ", channel with source id " + sourceChannelID);
                }
                return null;
            }
            throw new IllegalStateException("Unrecognized response to channel lookup.");
        }
        receiverList = new EnvelopeReceiverList(lookupResponse);
        this.receiverCache.put(sourceChannelID, receiverList);
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Receiver for %s: %s [%s])", sourceChannelID, receiverList.hasLocalReceiver() ? receiverList.getLocalReceiver() : receiverList.getRemoteReceiver(), receiverList.hasLocalReceiver() ? "local" : "remote"));
        }
        return receiverList;
    }

    public void invalidateLookupCacheEntries(Set<ChannelID> channelIDs) {
        for (ChannelID id : channelIDs) {
            this.receiverCache.remove(id);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void dispatchFromOutputChannel(Envelope envelope) throws IOException, InterruptedException {
        block14: {
            EnvelopeReceiverList receiverList = this.getReceiverListForEnvelope(envelope, true);
            Buffer srcBuffer = envelope.getBuffer();
            Buffer destBuffer = null;
            boolean success = false;
            try {
                if (receiverList.hasLocalReceiver()) {
                    ChannelID receiver = receiverList.getLocalReceiver();
                    Channel channel = this.channels.get(receiver);
                    if (channel == null) {
                        throw new LocalReceiverCancelledException(receiver);
                    }
                    if (!channel.isInputChannel()) {
                        throw new IOException("Local receiver " + receiver + " is not an input channel.");
                    }
                    InputChannel inputChannel = (InputChannel)channel;
                    if (srcBuffer != null) {
                        try {
                            destBuffer = inputChannel.requestBufferBlocking(srcBuffer.size());
                        }
                        catch (InterruptedException e) {
                            throw new IOException(e.getMessage());
                        }
                        srcBuffer.copyToBuffer(destBuffer);
                        envelope.setBuffer(destBuffer);
                        srcBuffer.recycleBuffer();
                    }
                    inputChannel.queueEnvelope(envelope);
                    success = true;
                    break block14;
                }
                if (receiverList.hasRemoteReceiver()) {
                    RemoteReceiver remoteReceiver = receiverList.getRemoteReceiver();
                    if (envelope.getSequenceNumber() == 0) {
                        this.generateSenderHint(envelope, remoteReceiver);
                    }
                    this.networkConnectionManager.enqueue(envelope, remoteReceiver);
                    success = true;
                }
            }
            finally {
                if (!success) {
                    if (srcBuffer != null) {
                        srcBuffer.recycleBuffer();
                    }
                    if (destBuffer != null) {
                        destBuffer.recycleBuffer();
                    }
                }
            }
        }
    }

    @Override
    public void dispatchFromInputChannel(Envelope envelope) throws IOException, InterruptedException {
        if (envelope.getBuffer() != null) {
            throw new RuntimeException("Error: This method can only process envelopes without buffers.");
        }
        EnvelopeReceiverList receiverList = this.getReceiverListForEnvelope(envelope, true);
        if (receiverList.hasLocalReceiver()) {
            ChannelID receiver = receiverList.getLocalReceiver();
            Channel channel = this.channels.get(receiver);
            if (channel == null) {
                throw new LocalReceiverCancelledException(receiver);
            }
            if (channel.isInputChannel()) {
                throw new IOException("Local receiver " + receiver + " of backward event is not an output channel.");
            }
            OutputChannel outputChannel = (OutputChannel)channel;
            outputChannel.queueEnvelope(envelope);
        } else if (receiverList.hasRemoteReceiver()) {
            RemoteReceiver remoteReceiver = receiverList.getRemoteReceiver();
            if (envelope.getSequenceNumber() == 0) {
                this.generateSenderHint(envelope, remoteReceiver);
            }
            this.networkConnectionManager.enqueue(envelope, remoteReceiver);
        }
    }

    @Override
    public void dispatchFromNetwork(Envelope envelope) throws IOException, InterruptedException {
        SenderHintEvent seh;
        if (SenderHintEvent.isSenderHintEvent(envelope) && this.channels.get((seh = (SenderHintEvent)envelope.deserializeEvents().get(0)).getSource()) != null) {
            this.addReceiverListHint(seh.getSource(), seh.getRemoteReceiver());
            return;
        }
        EnvelopeReceiverList receiverList = this.getReceiverListForEnvelope(envelope, false);
        if (receiverList == null) {
            this.releaseEnvelope(envelope);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Dropping envelope for cleaned up receiver.");
            }
            return;
        }
        if (!receiverList.hasLocalReceiver() || receiverList.hasRemoteReceiver()) {
            throw new IOException("Bug in network stack: Envelope dispatched from the incoming network pipe has no local receiver or has a remote receiver");
        }
        ChannelID localReceiver = receiverList.getLocalReceiver();
        Channel channel = this.channels.get(localReceiver);
        if (channel == null) {
            this.releaseEnvelope(envelope);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Dropping envelope for cancelled receiver " + localReceiver);
            }
        } else {
            channel.queueEnvelope(envelope);
        }
    }

    private final EnvelopeReceiverList getReceiverListForEnvelope(Envelope envelope, boolean reportException) throws IOException {
        try {
            return this.getReceiverList(envelope.getJobID(), envelope.getSource(), reportException);
        }
        catch (IOException e) {
            this.releaseEnvelope(envelope);
            throw e;
        }
        catch (CancelTaskException e) {
            this.releaseEnvelope(envelope);
            throw e;
        }
        catch (Throwable t) {
            this.releaseEnvelope(envelope);
            ExceptionUtils.rethrow((Throwable)t, (String)"Error while requesting receiver list.");
            return null;
        }
    }

    @Override
    public BufferProvider getBufferProvider(JobID jobID, ChannelID sourceChannelID) throws IOException {
        EnvelopeReceiverList receiverList = this.getReceiverList(jobID, sourceChannelID, false);
        if (receiverList == null) {
            return this.discardBufferPool;
        }
        if (!receiverList.hasLocalReceiver() || receiverList.hasRemoteReceiver()) {
            throw new IOException("The destination to be looked up is not a single local endpoint.");
        }
        ChannelID localReceiver = receiverList.getLocalReceiver();
        Channel channel = this.channels.get(localReceiver);
        if (channel == null) {
            return this.discardBufferPool;
        }
        if (!channel.isInputChannel()) {
            throw new IOException("Channel context for local receiver " + localReceiver + " is not an input channel context");
        }
        return (InputChannel)channel;
    }

    public void logBufferUtilization() {
        System.out.println("Buffer utilization at " + System.currentTimeMillis());
        System.out.println("\tUnused global buffers: " + this.globalBufferPool.numAvailableBuffers());
        System.out.println("\tLocal buffer pool status:");
        for (LocalBufferPoolOwner bufferPool : this.localBuffersPools.values()) {
            bufferPool.logBufferUtilization();
        }
        System.out.println("\tIncoming connections:");
        for (Channel channel : this.channels.values()) {
            if (!channel.isInputChannel()) continue;
            ((InputChannel)channel).logQueuedEnvelopes();
        }
    }
}

