package org.apache.kafka.common.network;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/common/network/Selector.class */
public class Selector implements Selectable {
    private static final Logger log = LoggerFactory.getLogger(Selector.class);
    private final java.nio.channels.Selector selector;
    private final Map<Integer, SelectionKey> keys;
    private final List<NetworkSend> completedSends;
    private final List<NetworkReceive> completedReceives;
    private final List<Integer> disconnected;
    private final List<Integer> connected;
    private final Time time;
    private final SelectorMetrics sensors;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/common/network/Selector$SelectorMetrics.class */
    public class SelectorMetrics {
        private final Metrics metrics;
        public final Sensor connectionClosed;
        public final Sensor connectionCreated;
        public final Sensor bytesTransferred;
        public final Sensor bytesSent;
        public final Sensor bytesReceived;
        public final Sensor selectTime;
        public final Sensor ioTime;

        public SelectorMetrics(Metrics metrics) {
            this.metrics = metrics;
            this.connectionClosed = this.metrics.sensor("connections-closed");
            this.connectionClosed.add("connection-close-rate", "Connections closed per second in the window.", new Rate());
            this.connectionCreated = this.metrics.sensor("connections-created");
            this.connectionCreated.add("connection-creation-rate", "New connections established per second in the window.", new Rate());
            this.bytesTransferred = this.metrics.sensor("bytes-sent-received");
            this.bytesTransferred.add("network-io-rate", "The average number of network operations (reads or writes) on all connections per second.", new Rate(new Count()));
            this.bytesSent = this.metrics.sensor("bytes-sent", this.bytesTransferred);
            this.bytesSent.add("outgoing-byte-rate", "The average number of outgoing bytes sent per second to all servers.", new Rate());
            this.bytesSent.add("request-rate", "The average number of requests sent per second.", new Rate(new Count()));
            this.bytesSent.add("request-size-avg", "The average size of all requests in the window..", new Avg());
            this.bytesSent.add("request-size-max", "The maximum size of any request sent in the window.", new Max());
            this.bytesReceived = this.metrics.sensor("bytes-received", this.bytesTransferred);
            this.bytesReceived.add("incoming-byte-rate", "Bytes/second read off all sockets", new Rate());
            this.bytesReceived.add("response-rate", "Responses received sent per second.", new Rate(new Count()));
            this.selectTime = this.metrics.sensor("select-time");
            this.selectTime.add("select-rate", "Number of times the I/O layer checked for new I/O to perform per second", new Rate(new Count()));
            this.selectTime.add("io-wait-time-ns-avg", "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", new Avg());
            this.selectTime.add("io-wait-ratio", "The fraction of time the I/O thread spent waiting.", new Rate(TimeUnit.NANOSECONDS));
            this.ioTime = this.metrics.sensor("io-time");
            this.ioTime.add("io-time-ns-avg", "The average length of time for I/O per select call in nanoseconds.", new Avg());
            this.ioTime.add("io-ratio", "The fraction of time the I/O thread spent doing I/O", new Rate(TimeUnit.NANOSECONDS));
            this.metrics.addMetric("connection-count", "The current number of active connections.", new Measurable() { // from class: org.apache.kafka.common.network.Selector.SelectorMetrics.1
                @Override // org.apache.kafka.common.metrics.Measurable
                public double measure(MetricConfig metricConfig, long j) {
                    return Selector.this.keys.size();
                }
            });
        }

        public void maybeRegisterNodeMetrics(int i) {
            if (i >= 0) {
                String str = "node-" + i + ".bytes-sent";
                if (this.metrics.getSensor(str) == null) {
                    Sensor sensor = this.metrics.sensor(str);
                    sensor.add("node-" + i + ".outgoing-byte-rate", new Rate());
                    sensor.add("node-" + i + ".request-rate", "The average number of requests sent per second.", new Rate(new Count()));
                    sensor.add("node-" + i + ".request-size-avg", "The average size of all requests in the window..", new Avg());
                    sensor.add("node-" + i + ".request-size-max", "The maximum size of any request sent in the window.", new Max());
                    Sensor sensor2 = this.metrics.sensor("node-" + i + ".bytes-received");
                    sensor2.add("node-" + i + ".incoming-byte-rate", new Rate());
                    sensor2.add("node-" + i + ".response-rate", "The average number of responses received per second.", new Rate(new Count()));
                    Sensor sensor3 = this.metrics.sensor("node-" + i + ".latency");
                    sensor3.add("node-" + i + ".request-latency-avg", new Avg());
                    sensor3.add("node-" + i + ".request-latency-max", new Max());
                }
            }
        }

        public void recordBytesSent(int i, int i2) {
            long milliseconds = Selector.this.time.milliseconds();
            this.bytesSent.record(i2, milliseconds);
            if (i >= 0) {
                Sensor sensor = this.metrics.getSensor("node-" + i + ".bytes-sent");
                if (sensor != null) {
                    sensor.record(i2, milliseconds);
                }
            }
        }

        public void recordBytesReceived(int i, int i2) {
            long milliseconds = Selector.this.time.milliseconds();
            this.bytesReceived.record(i2, milliseconds);
            if (i >= 0) {
                Sensor sensor = this.metrics.getSensor("node-" + i + ".bytes-received");
                if (sensor != null) {
                    sensor.record(i2, milliseconds);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/common/network/Selector$Transmissions.class */
    public static class Transmissions {
        public int id;
        public NetworkSend send;
        public NetworkReceive receive;

        public Transmissions(int i) {
            this.id = i;
        }

        public boolean hasSend() {
            return this.send != null;
        }

        public void clearSend() {
            this.send = null;
        }

        public boolean hasReceive() {
            return this.receive != null;
        }

        public void clearReceive() {
            this.receive = null;
        }
    }

    public Selector(Metrics metrics, Time time) {
        try {
            this.selector = java.nio.channels.Selector.open();
            this.time = time;
            this.keys = new HashMap();
            this.completedSends = new ArrayList();
            this.completedReceives = new ArrayList();
            this.connected = new ArrayList();
            this.disconnected = new ArrayList();
            this.sensors = new SelectorMetrics(metrics);
        } catch (IOException e) {
            throw new KafkaException(e);
        }
    }

    @Override // org.apache.kafka.common.network.Selectable
    public void connect(int i, InetSocketAddress inetSocketAddress, int i2, int i3) throws IOException {
        if (this.keys.containsKey(Integer.valueOf(i))) {
            throw new IllegalStateException("There is already a connection for id " + i);
        }
        SocketChannel open = SocketChannel.open();
        open.configureBlocking(false);
        Socket socket = open.socket();
        socket.setKeepAlive(true);
        socket.setSendBufferSize(i2);
        socket.setReceiveBufferSize(i3);
        socket.setTcpNoDelay(true);
        try {
            open.connect(inetSocketAddress);
            SelectionKey register = open.register(this.selector, 8);
            register.attach(new Transmissions(i));
            this.keys.put(Integer.valueOf(i), register);
        } catch (IOException e) {
            open.close();
            throw e;
        } catch (UnresolvedAddressException e2) {
            open.close();
            throw new IOException("Can't resolve address: " + inetSocketAddress, e2);
        }
    }

    @Override // org.apache.kafka.common.network.Selectable
    public void disconnect(int i) {
        SelectionKey selectionKey = this.keys.get(Integer.valueOf(i));
        if (selectionKey != null) {
            selectionKey.cancel();
        }
    }

    @Override // org.apache.kafka.common.network.Selectable
    public void wakeup() {
        this.selector.wakeup();
    }

    @Override // org.apache.kafka.common.network.Selectable
    public void close() {
        Iterator<SelectionKey> it = this.selector.keys().iterator();
        while (it.hasNext()) {
            close(it.next());
        }
        try {
            this.selector.close();
        } catch (IOException e) {
            log.error("Exception closing selector:", e);
        }
    }

    @Override // org.apache.kafka.common.network.Selectable
    public void poll(long j, List<NetworkSend> list) throws IOException {
        clear();
        for (NetworkSend networkSend : list) {
            SelectionKey keyForId = keyForId(networkSend.destination());
            Transmissions transmissions = transmissions(keyForId);
            if (transmissions.hasSend()) {
                throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
            }
            transmissions.send = networkSend;
            try {
                keyForId.interestOps(keyForId.interestOps() | 4);
            } catch (CancelledKeyException e) {
                close(keyForId);
            }
        }
        long nanoseconds = this.time.nanoseconds();
        int select = select(j);
        long nanoseconds2 = this.time.nanoseconds();
        this.sensors.selectTime.record(nanoseconds2 - nanoseconds, this.time.milliseconds());
        if (select > 0) {
            Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
            while (it.hasNext()) {
                SelectionKey next = it.next();
                it.remove();
                Transmissions transmissions2 = transmissions(next);
                SocketChannel channel = channel(next);
                this.sensors.maybeRegisterNodeMetrics(transmissions2.id);
                try {
                    if (next.isConnectable()) {
                        channel.finishConnect();
                        next.interestOps((next.interestOps() & (-9)) | 1);
                        this.connected.add(Integer.valueOf(transmissions2.id));
                        this.sensors.connectionCreated.record();
                    }
                    if (next.isReadable()) {
                        if (!transmissions2.hasReceive()) {
                            transmissions2.receive = new NetworkReceive(transmissions2.id);
                        }
                        transmissions2.receive.readFrom(channel);
                        if (transmissions2.receive.complete()) {
                            transmissions2.receive.payload().rewind();
                            this.completedReceives.add(transmissions2.receive);
                            this.sensors.recordBytesReceived(transmissions2.id, transmissions2.receive.payload().limit());
                            transmissions2.clearReceive();
                        }
                    }
                    if (next.isWritable()) {
                        transmissions2.send.writeTo(channel);
                        if (transmissions2.send.remaining() <= 0) {
                            this.completedSends.add(transmissions2.send);
                            this.sensors.recordBytesSent(transmissions2.id, transmissions2.send.size());
                            transmissions2.clearSend();
                            next.interestOps(next.interestOps() & (-5));
                        }
                    }
                    if (!next.isValid()) {
                        close(next);
                    }
                } catch (IOException e2) {
                    Socket socket = channel.socket();
                    log.warn("Error in I/O with {}", socket != null ? socket.getInetAddress() : null, e2);
                    close(next);
                }
            }
        }
        this.sensors.ioTime.record(this.time.nanoseconds() - nanoseconds2, this.time.milliseconds());
    }

    @Override // org.apache.kafka.common.network.Selectable
    public List<NetworkSend> completedSends() {
        return this.completedSends;
    }

    @Override // org.apache.kafka.common.network.Selectable
    public List<NetworkReceive> completedReceives() {
        return this.completedReceives;
    }

    @Override // org.apache.kafka.common.network.Selectable
    public List<Integer> disconnected() {
        return this.disconnected;
    }

    @Override // org.apache.kafka.common.network.Selectable
    public List<Integer> connected() {
        return this.connected;
    }

    private void clear() {
        this.completedSends.clear();
        this.completedReceives.clear();
        this.connected.clear();
        this.disconnected.clear();
    }

    private int select(long j) throws IOException {
        return j == 0 ? this.selector.selectNow() : j < 0 ? this.selector.select() : this.selector.select(j);
    }

    private void close(SelectionKey selectionKey) {
        SocketChannel channel = channel(selectionKey);
        Transmissions transmissions = transmissions(selectionKey);
        if (transmissions != null) {
            this.disconnected.add(Integer.valueOf(transmissions.id));
            this.keys.remove(Integer.valueOf(transmissions.id));
            transmissions.clearReceive();
            transmissions.clearSend();
        }
        selectionKey.attach(null);
        selectionKey.cancel();
        try {
            channel.socket().close();
            channel.close();
        } catch (IOException e) {
            log.error("Exception closing connection to node {}:", Integer.valueOf(transmissions.id), e);
        }
        this.sensors.connectionClosed.record();
    }

    private SelectionKey keyForId(int i) {
        SelectionKey selectionKey = this.keys.get(Integer.valueOf(i));
        if (selectionKey == null) {
            throw new IllegalStateException("Attempt to write to socket for which there is no open connection.");
        }
        return selectionKey;
    }

    private Transmissions transmissions(SelectionKey selectionKey) {
        return (Transmissions) selectionKey.attachment();
    }

    private SocketChannel channel(SelectionKey selectionKey) {
        return (SocketChannel) selectionKey.channel();
    }
}
