package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.class */
public class PartitionRequestClientFactory {
    private final NettyClient nettyClient;
    private final ConcurrentMap<ConnectionID, Object> clients = new ConcurrentHashMap();

    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory$ConnectingChannel.class */
    private static final class ConnectingChannel implements ChannelFutureListener {
        private final ConnectionID connectionId;
        private final PartitionRequestClientFactory clientFactory;
        private volatile NettyPartitionRequestClient partitionRequestClient;
        private volatile Throwable error;
        private final Object connectLock = new Object();
        private boolean disposeRequestClient = false;

        public ConnectingChannel(ConnectionID connectionID, PartitionRequestClientFactory partitionRequestClientFactory) {
            this.connectionId = connectionID;
            this.clientFactory = partitionRequestClientFactory;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean dispose() {
            boolean z;
            synchronized (this.connectLock) {
                if (this.partitionRequestClient != null) {
                    z = this.partitionRequestClient.disposeIfNotUsed();
                } else {
                    this.disposeRequestClient = true;
                    z = true;
                }
                this.connectLock.notifyAll();
            }
            return z;
        }

        private void handInChannel(Channel channel) {
            synchronized (this.connectLock) {
                try {
                    this.partitionRequestClient = new NettyPartitionRequestClient(channel, (NetworkClientHandler) channel.pipeline().get(NetworkClientHandler.class), this.connectionId, this.clientFactory);
                    if (this.disposeRequestClient) {
                        this.partitionRequestClient.disposeIfNotUsed();
                    }
                    this.connectLock.notifyAll();
                } catch (Throwable th) {
                    notifyOfError(th);
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public NettyPartitionRequestClient waitForChannel() throws IOException, InterruptedException {
            synchronized (this.connectLock) {
                while (this.error == null && this.partitionRequestClient == null) {
                    this.connectLock.wait(2000L);
                }
            }
            if (this.error != null) {
                throw new IOException("Connecting the channel failed: " + this.error.getMessage(), this.error);
            }
            return this.partitionRequestClient;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void notifyOfError(Throwable th) {
            synchronized (this.connectLock) {
                this.error = th;
                this.connectLock.notifyAll();
            }
        }

        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            if (channelFuture.isSuccess()) {
                handInChannel(channelFuture.channel());
            } else if (channelFuture.cause() != null) {
                notifyOfError(new RemoteTransportException("Connecting to remote task manager + '" + this.connectionId.getAddress() + "' has failed. This might indicate that the remote task manager has been lost.", this.connectionId.getAddress(), channelFuture.cause()));
            } else {
                notifyOfError(new LocalTransportException(String.format("Connecting to remote task manager '%s' has been cancelled.", this.connectionId.getAddress()), null));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PartitionRequestClientFactory(NettyClient nettyClient) {
        this.nettyClient = nettyClient;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NettyPartitionRequestClient createPartitionRequestClient(ConnectionID connectionID) throws IOException, InterruptedException {
        NettyPartitionRequestClient nettyPartitionRequestClient = null;
        while (nettyPartitionRequestClient == null) {
            Object obj = this.clients.get(connectionID);
            if (obj == null) {
                ConnectingChannel connectingChannel = new ConnectingChannel(connectionID, this);
                Object putIfAbsent = this.clients.putIfAbsent(connectionID, connectingChannel);
                if (putIfAbsent == null) {
                    try {
                        this.nettyClient.connect(connectionID.getAddress()).addListener(connectingChannel);
                        nettyPartitionRequestClient = connectingChannel.waitForChannel();
                        this.clients.replace(connectionID, connectingChannel, nettyPartitionRequestClient);
                    } catch (Exception e) {
                        connectingChannel.notifyOfError(e);
                        throw new IOException("Connecting the channel failed: " + e.getMessage(), e);
                    }
                } else if (putIfAbsent instanceof ConnectingChannel) {
                    nettyPartitionRequestClient = ((ConnectingChannel) putIfAbsent).waitForChannel();
                    this.clients.replace(connectionID, putIfAbsent, nettyPartitionRequestClient);
                } else {
                    nettyPartitionRequestClient = (NettyPartitionRequestClient) putIfAbsent;
                }
            } else if (obj instanceof NettyPartitionRequestClient) {
                nettyPartitionRequestClient = (NettyPartitionRequestClient) obj;
            } else {
                ConnectingChannel connectingChannel2 = (ConnectingChannel) obj;
                nettyPartitionRequestClient = connectingChannel2.waitForChannel();
                this.clients.replace(connectionID, connectingChannel2, nettyPartitionRequestClient);
            }
            if (!nettyPartitionRequestClient.incrementReferenceCounter()) {
                destroyPartitionRequestClient(connectionID, nettyPartitionRequestClient);
                nettyPartitionRequestClient = null;
            }
        }
        return nettyPartitionRequestClient;
    }

    public void closeOpenChannelConnections(ConnectionID connectionID) {
        Object obj = this.clients.get(connectionID);
        if (obj instanceof ConnectingChannel) {
            ConnectingChannel connectingChannel = (ConnectingChannel) obj;
            if (connectingChannel.dispose()) {
                this.clients.remove(connectionID, connectingChannel);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getNumberOfActiveClients() {
        return this.clients.size();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void destroyPartitionRequestClient(ConnectionID connectionID, PartitionRequestClient partitionRequestClient) {
        this.clients.remove(connectionID, partitionRequestClient);
    }
}
