/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.UnknownInputChannel;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleUtils;
import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SingleInputGateFactory {
    private static final Logger LOG = LoggerFactory.getLogger(SingleInputGateFactory.class);
    @Nonnull
    private final ResourceID taskExecutorResourceId;
    private final boolean isCreditBased;
    private final int partitionRequestInitialBackoff;
    private final int partitionRequestMaxBackoff;
    @Nonnull
    private final ConnectionManager connectionManager;
    @Nonnull
    private final ResultPartitionManager partitionManager;
    @Nonnull
    private final TaskEventPublisher taskEventPublisher;
    @Nonnull
    private final NetworkBufferPool networkBufferPool;
    private final int networkBuffersPerChannel;
    private final int floatingNetworkBuffersPerGate;

    public SingleInputGateFactory(@Nonnull ResourceID taskExecutorResourceId, @Nonnull NettyShuffleEnvironmentConfiguration networkConfig, @Nonnull ConnectionManager connectionManager, @Nonnull ResultPartitionManager partitionManager, @Nonnull TaskEventPublisher taskEventPublisher, @Nonnull NetworkBufferPool networkBufferPool) {
        this.taskExecutorResourceId = taskExecutorResourceId;
        this.isCreditBased = networkConfig.isCreditBased();
        this.partitionRequestInitialBackoff = networkConfig.partitionRequestInitialBackoff();
        this.partitionRequestMaxBackoff = networkConfig.partitionRequestMaxBackoff();
        this.networkBuffersPerChannel = networkConfig.networkBuffersPerChannel();
        this.floatingNetworkBuffersPerGate = networkConfig.floatingNetworkBuffersPerGate();
        this.connectionManager = connectionManager;
        this.partitionManager = partitionManager;
        this.taskEventPublisher = taskEventPublisher;
        this.networkBufferPool = networkBufferPool;
    }

    public SingleInputGate create(@Nonnull String owningTaskName, @Nonnull InputGateDeploymentDescriptor igdd, @Nonnull PartitionProducerStateProvider partitionProducerStateProvider, @Nonnull InputChannelMetrics metrics) {
        SupplierWithException<BufferPool, IOException> bufferPoolFactory = SingleInputGateFactory.createBufferPoolFactory(this.networkBufferPool, this.isCreditBased, this.networkBuffersPerChannel, this.floatingNetworkBuffersPerGate, igdd.getShuffleDescriptors().length, igdd.getConsumedPartitionType());
        SingleInputGate inputGate = new SingleInputGate(owningTaskName, igdd.getConsumedResultId(), igdd.getConsumedPartitionType(), igdd.getConsumedSubpartitionIndex(), igdd.getShuffleDescriptors().length, partitionProducerStateProvider, this.isCreditBased, bufferPoolFactory);
        this.createInputChannels(owningTaskName, igdd, inputGate, metrics);
        return inputGate;
    }

    private void createInputChannels(String owningTaskName, InputGateDeploymentDescriptor inputGateDeploymentDescriptor, SingleInputGate inputGate, InputChannelMetrics metrics) {
        ShuffleDescriptor[] shuffleDescriptors = inputGateDeploymentDescriptor.getShuffleDescriptors();
        InputChannel[] inputChannels = new InputChannel[shuffleDescriptors.length];
        ChannelStatistics channelStatistics = new ChannelStatistics();
        for (int i = 0; i < inputChannels.length; ++i) {
            inputChannels[i] = this.createInputChannel(inputGate, i, shuffleDescriptors[i], channelStatistics, metrics);
            ResultPartitionID resultPartitionID = inputChannels[i].getPartitionId();
            inputGate.setInputChannel(resultPartitionID.getPartitionId(), inputChannels[i]);
        }
        LOG.debug("{}: Created {} input channels ({}).", new Object[]{owningTaskName, inputChannels.length, channelStatistics});
    }

    private InputChannel createInputChannel(SingleInputGate inputGate, int index, ShuffleDescriptor shuffleDescriptor, ChannelStatistics channelStatistics, InputChannelMetrics metrics) {
        return ShuffleUtils.applyWithShuffleTypeCheck(NettyShuffleDescriptor.class, shuffleDescriptor, unknownShuffleDescriptor -> {
            ++channelStatistics.numUnknownChannels;
            return new UnknownInputChannel(inputGate, index, unknownShuffleDescriptor.getResultPartitionID(), this.partitionManager, this.taskEventPublisher, this.connectionManager, this.partitionRequestInitialBackoff, this.partitionRequestMaxBackoff, metrics, this.networkBufferPool);
        }, nettyShuffleDescriptor -> this.createKnownInputChannel(inputGate, index, (NettyShuffleDescriptor)nettyShuffleDescriptor, channelStatistics, metrics));
    }

    private InputChannel createKnownInputChannel(SingleInputGate inputGate, int index, NettyShuffleDescriptor inputChannelDescriptor, ChannelStatistics channelStatistics, InputChannelMetrics metrics) {
        ResultPartitionID partitionId = inputChannelDescriptor.getResultPartitionID();
        if (inputChannelDescriptor.isLocalTo(this.taskExecutorResourceId)) {
            ++channelStatistics.numLocalChannels;
            return new LocalInputChannel(inputGate, index, partitionId, this.partitionManager, this.taskEventPublisher, this.partitionRequestInitialBackoff, this.partitionRequestMaxBackoff, metrics);
        }
        ++channelStatistics.numRemoteChannels;
        return new RemoteInputChannel(inputGate, index, partitionId, inputChannelDescriptor.getConnectionId(), this.connectionManager, this.partitionRequestInitialBackoff, this.partitionRequestMaxBackoff, metrics, this.networkBufferPool);
    }

    @VisibleForTesting
    static SupplierWithException<BufferPool, IOException> createBufferPoolFactory(BufferPoolFactory bufferPoolFactory, boolean isCreditBased, int networkBuffersPerChannel, int floatingNetworkBuffersPerGate, int size, ResultPartitionType type) {
        if (isCreditBased) {
            int maxNumberOfMemorySegments = type.isBounded() ? floatingNetworkBuffersPerGate : Integer.MAX_VALUE;
            return () -> bufferPoolFactory.createBufferPool(0, maxNumberOfMemorySegments);
        }
        int maxNumberOfMemorySegments = type.isBounded() ? size * networkBuffersPerChannel + floatingNetworkBuffersPerGate : Integer.MAX_VALUE;
        return () -> bufferPoolFactory.createBufferPool(size, maxNumberOfMemorySegments);
    }

    private static class ChannelStatistics {
        int numLocalChannels;
        int numRemoteChannels;
        int numUnknownChannels;

        private ChannelStatistics() {
        }

        public String toString() {
            return String.format("local: %s, remote: %s, unknown: %s", this.numLocalChannels, this.numRemoteChannels, this.numUnknownChannels);
        }
    }
}

