package kafka.network;

import java.io.EOFException;
import java.net.SocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import kafka.network.RequestChannel;
import kafka.utils.SystemTime$;
import kafka.utils.Time;
import scala.Function0;
import scala.ScalaObject;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.ObjectRef;

/* compiled from: SocketServer.scala */
@ScalaSignature(bytes = "\u0006\u0001M4Q!\u0001\u0002\u0001\t\u0019\u0011\u0011\u0002\u0015:pG\u0016\u001c8o\u001c:\u000b\u0005\r!\u0011a\u00028fi^|'o\u001b\u0006\u0002\u000b\u0005)1.\u00194lCN\u0019\u0001aB\u0006\u0011\u0005!IQ\"\u0001\u0002\n\u0005)\u0011!\u0001F!cgR\u0014\u0018m\u0019;TKJ4XM\u001d+ie\u0016\fG\r\u0005\u0002\r\u001f5\tQBC\u0001\u000f\u0003\u0015\u00198-\u00197b\u0013\t\u0001RBA\u0006TG\u0006d\u0017m\u00142kK\u000e$\b\u0002\u0003\n\u0001\u0005\u000b\u0007I\u0011\u0001\u000b\u0002\u0005%$7\u0001A\u000b\u0002+A\u0011ABF\u0005\u0003/5\u00111!\u00138u\u0011!I\u0002A!A!\u0002\u0013)\u0012aA5eA!A1\u0004\u0001BC\u0002\u0013\u0005A$\u0001\u0003uS6,W#A\u000f\u0011\u0005y\tS\"A\u0010\u000b\u0005\u0001\"\u0011!B;uS2\u001c\u0018B\u0001\u0012 \u0005\u0011!\u0016.\\3\t\u0011\u0011\u0002!\u0011!Q\u0001\nu\tQ\u0001^5nK\u0002B\u0001B\n\u0001\u0003\u0006\u0004%\t\u0001F\u0001\u000f[\u0006D(+Z9vKN$8+\u001b>f\u0011!A\u0003A!A!\u0002\u0013)\u0012aD7bqJ+\u0017/^3tiNK'0\u001a\u0011\t\u0011)\u0002!Q1A\u0005\u0002-\naB]3rk\u0016\u001cHo\u00115b]:,G.F\u0001-!\tAQ&\u0003\u0002/\u0005\tq!+Z9vKN$8\t[1o]\u0016d\u0007\u0002\u0003\u0019\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0017\u0002\u001fI,\u0017/^3ti\u000eC\u0017M\u001c8fY\u0002BQA\r\u0001\u0005\u0002M\na\u0001P5oSRtD#\u0002\u001b6m]B\u0004C\u0001\u0005\u0001\u0011\u0015\u0011\u0012\u00071\u0001\u0016\u0011\u0015Y\u0012\u00071\u0001\u001e\u0011\u00151\u0013\u00071\u0001\u0016\u0011\u0015Q\u0013\u00071\u0001-\u0011\u001dQ\u0004A1A\u0005\nm\naB\\3x\u0007>tg.Z2uS>t7/F\u0001=!\riDIR\u0007\u0002})\u0011q\bQ\u0001\u000bG>t7-\u001e:sK:$(BA!C\u0003\u0011)H/\u001b7\u000b\u0003\r\u000bAA[1wC&\u0011QI\u0010\u0002\u0016\u0007>t7-\u001e:sK:$H*\u001b8lK\u0012\fV/Z;f!\t9E*D\u0001I\u0015\tI%*\u0001\u0005dQ\u0006tg.\u001a7t\u0015\tY%)A\u0002oS>L!!\u0014%\u0003\u001bM{7m[3u\u0007\"\fgN\\3m\u0011\u0019y\u0005\u0001)A\u0005y\u0005ya.Z<D_:tWm\u0019;j_:\u001c\b\u0005C\u0003R\u0001\u0011\u0005#+A\u0002sk:$\u0012a\u0015\t\u0003\u0019QK!!V\u0007\u0003\tUs\u0017\u000e\u001e\u0005\u0006/\u0002!IAU\u0001\u0014aJ|7-Z:t\u001d\u0016<(+Z:q_:\u001cXm\u001d\u0005\u00063\u0002!IAW\u0001\u0006G2|7/\u001a\u000b\u0003'nCQ\u0001\u0018-A\u0002u\u000b1a[3z!\t9e,\u0003\u0002`\u0011\na1+\u001a7fGRLwN\\&fs\")\u0011\r\u0001C\u0001E\u00061\u0011mY2faR$\"aU2\t\u000b\u0011\u0004\u0007\u0019\u0001$\u0002\u001bM|7m[3u\u0007\"\fgN\\3m\u0011\u00151\u0007\u0001\"\u0003S\u0003]\u0019wN\u001c4jOV\u0014XMT3x\u0007>tg.Z2uS>t7\u000fC\u0003i\u0001\u0011\u0005\u0011.\u0001\u0003sK\u0006$GCA*k\u0011\u0015av\r1\u0001^\u0011\u0015a\u0007\u0001\"\u0001n\u0003\u00159(/\u001b;f)\t\u0019f\u000eC\u0003]W\u0002\u0007Q\fC\u0003q\u0001\u0011%\u0011/\u0001\u0006dQ\u0006tg.\u001a7G_J$\"A\u0012:\t\u000bq{\u0007\u0019A/")
/* loaded from: input_file:kafka/network/Processor.class */
public class Processor extends AbstractServerThread implements ScalaObject {
    private final int id;
    private final Time time;
    private final int maxRequestSize;
    private final RequestChannel requestChannel;
    private final ConcurrentLinkedQueue<SocketChannel> newConnections = new ConcurrentLinkedQueue<>();

    public int id() {
        return this.id;
    }

    public Time time() {
        return this.time;
    }

    public int maxRequestSize() {
        return this.maxRequestSize;
    }

    public RequestChannel requestChannel() {
        return this.requestChannel;
    }

    private ConcurrentLinkedQueue<SocketChannel> newConnections() {
        return this.newConnections;
    }

    @Override // java.lang.Runnable
    public void run() {
        startupComplete();
        loop0: while (isRunning()) {
            configureNewConnections();
            processNewResponses();
            long milliseconds = SystemTime$.MODULE$.milliseconds();
            int select = selector().select(300L);
            trace((Function0<String>) new Processor$$anonfun$run$7(this, milliseconds));
            if (select > 0) {
                Iterator<SelectionKey> it = selector().selectedKeys().iterator();
                while (it.hasNext() && isRunning()) {
                    ObjectRef objectRef = new ObjectRef(null);
                    try {
                        objectRef.elem = it.next();
                        it.remove();
                        if (((SelectionKey) objectRef.elem).isReadable()) {
                            read((SelectionKey) objectRef.elem);
                        } else if (((SelectionKey) objectRef.elem).isWritable()) {
                            write((SelectionKey) objectRef.elem);
                        } else {
                            if (((SelectionKey) objectRef.elem).isValid()) {
                                throw new IllegalStateException("Unrecognized key state for processor thread.");
                                break loop0;
                            }
                            close((SelectionKey) objectRef.elem);
                        }
                    } catch (EOFException e) {
                        info((Function0<String>) new Processor$$anonfun$run$8(this, objectRef));
                        close((SelectionKey) objectRef.elem);
                    } catch (InvalidRequestException e2) {
                        info((Function0<String>) new Processor$$anonfun$run$9(this, objectRef, e2));
                        close((SelectionKey) objectRef.elem);
                    } catch (Throwable th) {
                        error(new Processor$$anonfun$run$10(this, objectRef), new Processor$$anonfun$run$11(this, th));
                        close((SelectionKey) objectRef.elem);
                    }
                }
            }
        }
        debug((Function0<String>) new Processor$$anonfun$run$12(this));
        swallowError(new Processor$$anonfun$run$3(this));
        shutdownComplete();
    }

    /*  JADX ERROR: JadxRuntimeException in pass: BlockSplitter
        jadx.core.utils.exceptions.JadxRuntimeException: Unexpected missing predecessor for block: B:3:0x0016
        	at jadx.core.dex.visitors.blocks.BlockSplitter.addTempConnectionsForExcHandlers(BlockSplitter.java:275)
        	at jadx.core.dex.visitors.blocks.BlockSplitter.visit(BlockSplitter.java:68)
        */
    private void processNewResponses() {
        /*
            Method dump skipped, instructions count: 372
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: kafka.network.Processor.processNewResponses():void");
    }

    private void close(SelectionKey selectionKey) {
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        debug((Function0<String>) new Processor$$anonfun$close$4(this, socketChannel));
        swallowError(new Processor$$anonfun$close$1(this, socketChannel));
        swallowError(new Processor$$anonfun$close$2(this, socketChannel));
        selectionKey.attach(null);
        swallowError(new Processor$$anonfun$close$3(this, selectionKey));
    }

    public void accept(SocketChannel socketChannel) {
        newConnections().add(socketChannel);
        wakeup();
    }

    private void configureNewConnections() {
        while (newConnections().size() > 0) {
            SocketChannel poll = newConnections().poll();
            debug((Function0<String>) new Processor$$anonfun$configureNewConnections$1(this, poll));
            poll.register(selector(), 1);
        }
    }

    public void read(SelectionKey selectionKey) {
        SocketChannel kafka$network$Processor$$channelFor = kafka$network$Processor$$channelFor(selectionKey);
        Receive receive = (Receive) selectionKey.attachment();
        if (selectionKey.attachment() == null) {
            receive = new BoundedByteBufferReceive(maxRequestSize());
            selectionKey.attach(receive);
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        int readFrom = receive.readFrom(kafka$network$Processor$$channelFor);
        SocketAddress remoteSocketAddress = kafka$network$Processor$$channelFor.socket().getRemoteSocketAddress();
        trace((Function0<String>) new Processor$$anonfun$read$1(this, readFrom, remoteSocketAddress));
        if (readFrom < 0) {
            close(selectionKey);
            return;
        }
        if (!receive.complete()) {
            trace((Function0<String>) new Processor$$anonfun$read$2(this, kafka$network$Processor$$channelFor));
            selectionKey.interestOps(1);
            wakeup();
        } else {
            requestChannel().sendRequest(new RequestChannel.Request(id(), selectionKey, receive.buffer(), time().milliseconds(), remoteSocketAddress));
            selectionKey.attach(null);
            selectionKey.interestOps(selectionKey.interestOps() & (1 ^ (-1)));
        }
    }

    public void write(SelectionKey selectionKey) {
        SocketChannel kafka$network$Processor$$channelFor = kafka$network$Processor$$channelFor(selectionKey);
        RequestChannel.Response response = (RequestChannel.Response) selectionKey.attachment();
        Send responseSend = response.responseSend();
        if (responseSend == null) {
            throw new IllegalStateException("Registered for write interest but no response attached to key.");
        }
        trace((Function0<String>) new Processor$$anonfun$write$1(this, selectionKey, kafka$network$Processor$$channelFor, responseSend.writeTo(kafka$network$Processor$$channelFor)));
        if (!responseSend.complete()) {
            trace((Function0<String>) new Processor$$anonfun$write$3(this, kafka$network$Processor$$channelFor));
            selectionKey.interestOps(4);
            wakeup();
        } else {
            response.request().updateRequestMetrics();
            selectionKey.attach(null);
            trace((Function0<String>) new Processor$$anonfun$write$2(this, kafka$network$Processor$$channelFor));
            selectionKey.interestOps(1);
        }
    }

    public final SocketChannel kafka$network$Processor$$channelFor(SelectionKey selectionKey) {
        return (SocketChannel) selectionKey.channel();
    }

    public Processor(int i, Time time, int i2, RequestChannel requestChannel) {
        this.id = i;
        this.time = time;
        this.maxRequestSize = i2;
        this.requestChannel = requestChannel;
    }
}
