package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleUtils;
import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.class */
public class SingleInputGateFactory {
    private static final Logger LOG = LoggerFactory.getLogger(SingleInputGateFactory.class);

    @Nonnull
    private final ResourceID taskExecutorResourceId;
    private final boolean isCreditBased;
    private final int partitionRequestInitialBackoff;
    private final int partitionRequestMaxBackoff;

    @Nonnull
    private final ConnectionManager connectionManager;

    @Nonnull
    private final ResultPartitionManager partitionManager;

    @Nonnull
    private final TaskEventPublisher taskEventPublisher;

    @Nonnull
    private final NetworkBufferPool networkBufferPool;
    private final int networkBuffersPerChannel;
    private final int floatingNetworkBuffersPerGate;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory$ChannelStatistics.class */
    public static class ChannelStatistics {
        int numLocalChannels;
        int numRemoteChannels;
        int numUnknownChannels;

        private ChannelStatistics() {
        }

        public String toString() {
            return String.format("local: %s, remote: %s, unknown: %s", Integer.valueOf(this.numLocalChannels), Integer.valueOf(this.numRemoteChannels), Integer.valueOf(this.numUnknownChannels));
        }
    }

    public SingleInputGateFactory(@Nonnull ResourceID resourceID, @Nonnull NettyShuffleEnvironmentConfiguration nettyShuffleEnvironmentConfiguration, @Nonnull ConnectionManager connectionManager, @Nonnull ResultPartitionManager resultPartitionManager, @Nonnull TaskEventPublisher taskEventPublisher, @Nonnull NetworkBufferPool networkBufferPool) {
        this.taskExecutorResourceId = resourceID;
        this.isCreditBased = nettyShuffleEnvironmentConfiguration.isCreditBased();
        this.partitionRequestInitialBackoff = nettyShuffleEnvironmentConfiguration.partitionRequestInitialBackoff();
        this.partitionRequestMaxBackoff = nettyShuffleEnvironmentConfiguration.partitionRequestMaxBackoff();
        this.networkBuffersPerChannel = nettyShuffleEnvironmentConfiguration.networkBuffersPerChannel();
        this.floatingNetworkBuffersPerGate = nettyShuffleEnvironmentConfiguration.floatingNetworkBuffersPerGate();
        this.connectionManager = connectionManager;
        this.partitionManager = resultPartitionManager;
        this.taskEventPublisher = taskEventPublisher;
        this.networkBufferPool = networkBufferPool;
    }

    public SingleInputGate create(@Nonnull String str, @Nonnull InputGateDeploymentDescriptor inputGateDeploymentDescriptor, @Nonnull PartitionProducerStateProvider partitionProducerStateProvider, @Nonnull InputChannelMetrics inputChannelMetrics) {
        SingleInputGate singleInputGate = new SingleInputGate(str, inputGateDeploymentDescriptor.getConsumedResultId(), inputGateDeploymentDescriptor.getConsumedPartitionType(), inputGateDeploymentDescriptor.getConsumedSubpartitionIndex(), inputGateDeploymentDescriptor.getShuffleDescriptors().length, partitionProducerStateProvider, this.isCreditBased, createBufferPoolFactory(this.networkBufferPool, this.isCreditBased, this.networkBuffersPerChannel, this.floatingNetworkBuffersPerGate, inputGateDeploymentDescriptor.getShuffleDescriptors().length, inputGateDeploymentDescriptor.getConsumedPartitionType()));
        createInputChannels(str, inputGateDeploymentDescriptor, singleInputGate, inputChannelMetrics);
        return singleInputGate;
    }

    private void createInputChannels(String str, InputGateDeploymentDescriptor inputGateDeploymentDescriptor, SingleInputGate singleInputGate, InputChannelMetrics inputChannelMetrics) {
        ShuffleDescriptor[] shuffleDescriptors = inputGateDeploymentDescriptor.getShuffleDescriptors();
        InputChannel[] inputChannelArr = new InputChannel[shuffleDescriptors.length];
        ChannelStatistics channelStatistics = new ChannelStatistics();
        for (int i = 0; i < inputChannelArr.length; i++) {
            inputChannelArr[i] = createInputChannel(singleInputGate, i, shuffleDescriptors[i], channelStatistics, inputChannelMetrics);
            singleInputGate.setInputChannel(inputChannelArr[i].getPartitionId().getPartitionId(), inputChannelArr[i]);
        }
        LOG.debug("{}: Created {} input channels ({}).", new Object[]{str, Integer.valueOf(inputChannelArr.length), channelStatistics});
    }

    private InputChannel createInputChannel(SingleInputGate singleInputGate, int i, ShuffleDescriptor shuffleDescriptor, ChannelStatistics channelStatistics, InputChannelMetrics inputChannelMetrics) {
        return (InputChannel) ShuffleUtils.applyWithShuffleTypeCheck(NettyShuffleDescriptor.class, shuffleDescriptor, unknownShuffleDescriptor -> {
            channelStatistics.numUnknownChannels++;
            return new UnknownInputChannel(singleInputGate, i, unknownShuffleDescriptor.getResultPartitionID(), this.partitionManager, this.taskEventPublisher, this.connectionManager, this.partitionRequestInitialBackoff, this.partitionRequestMaxBackoff, inputChannelMetrics, this.networkBufferPool);
        }, nettyShuffleDescriptor -> {
            return createKnownInputChannel(singleInputGate, i, nettyShuffleDescriptor, channelStatistics, inputChannelMetrics);
        });
    }

    private InputChannel createKnownInputChannel(SingleInputGate singleInputGate, int i, NettyShuffleDescriptor nettyShuffleDescriptor, ChannelStatistics channelStatistics, InputChannelMetrics inputChannelMetrics) {
        ResultPartitionID resultPartitionID = nettyShuffleDescriptor.getResultPartitionID();
        if (nettyShuffleDescriptor.isLocalTo(this.taskExecutorResourceId)) {
            channelStatistics.numLocalChannels++;
            return new LocalInputChannel(singleInputGate, i, resultPartitionID, this.partitionManager, this.taskEventPublisher, this.partitionRequestInitialBackoff, this.partitionRequestMaxBackoff, inputChannelMetrics);
        }
        channelStatistics.numRemoteChannels++;
        return new RemoteInputChannel(singleInputGate, i, resultPartitionID, nettyShuffleDescriptor.getConnectionId(), this.connectionManager, this.partitionRequestInitialBackoff, this.partitionRequestMaxBackoff, inputChannelMetrics, this.networkBufferPool);
    }

    @VisibleForTesting
    static SupplierWithException<BufferPool, IOException> createBufferPoolFactory(BufferPoolFactory bufferPoolFactory, boolean z, int i, int i2, int i3, ResultPartitionType resultPartitionType) {
        if (z) {
            int i4 = resultPartitionType.isBounded() ? i2 : CheckpointFailureManager.UNLIMITED_TOLERABLE_FAILURE_NUMBER;
            return () -> {
                return bufferPoolFactory.createBufferPool(0, i4);
            };
        }
        int i5 = resultPartitionType.isBounded() ? (i3 * i) + i2 : CheckpointFailureManager.UNLIMITED_TOLERABLE_FAILURE_NUMBER;
        return () -> {
            return bufferPoolFactory.createBufferPool(i3, i5);
        };
    }
}
