package kafka.controller;

import java.util.concurrent.BlockingQueue;
import kafka.api.LeaderAndIsrResponse$;
import kafka.api.RequestKeys$;
import kafka.api.RequestOrResponse;
import kafka.api.StopReplicaResponse$;
import kafka.api.UpdateMetadataResponse$;
import kafka.cluster.Broker;
import kafka.network.BlockingChannel;
import kafka.network.Receive;
import kafka.utils.ShutdownableThread;
import kafka.utils.ShutdownableThread$;
import kafka.utils.Utils$;
import org.apache.log4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.ScalaObject;
import scala.Tuple2;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: ControllerChannelManager.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005-a\u0001C\u0001\u0003\t\u0003\u0005\t\u0011A\u0004\u0003#I+\u0017/^3tiN+g\u000e\u001a+ie\u0016\fGM\u0003\u0002\u0004\t\u0005Q1m\u001c8ue>dG.\u001a:\u000b\u0003\u0015\tQa[1gW\u0006\u001c\u0001aE\u0002\u0001\u00119\u0001\"!\u0003\u0007\u000e\u0003)Q!a\u0003\u0003\u0002\u000bU$\u0018\u000e\\:\n\u00055Q!AE*ikR$wn\u001e8bE2,G\u000b\u001b:fC\u0012\u0004\"a\u0004\n\u000e\u0003AQ\u0011!E\u0001\u0006g\u000e\fG.Y\u0005\u0003'A\u00111bU2bY\u0006|%M[3di\"AQ\u0003\u0001BC\u0002\u0013\u0005a#\u0001\u0007d_:$(o\u001c7mKJLE-F\u0001\u0018!\ty\u0001$\u0003\u0002\u001a!\t\u0019\u0011J\u001c;\t\u0011m\u0001!\u0011!Q\u0001\n]\tQbY8oiJ|G\u000e\\3s\u0013\u0012\u0004\u0003\u0002C\u000f\u0001\u0005\u000b\u0007I\u0011\u0001\u0010\u0002#\r|g\u000e\u001e:pY2,'oQ8oi\u0016DH/F\u0001 !\t\u0001\u0013%D\u0001\u0003\u0013\t\u0011#AA\tD_:$(o\u001c7mKJ\u001cuN\u001c;fqRD\u0001\u0002\n\u0001\u0003\u0002\u0003\u0006IaH\u0001\u0013G>tGO]8mY\u0016\u00148i\u001c8uKb$\b\u0005\u0003\u0005'\u0001\t\u0015\r\u0011\"\u0001(\u0003!!xN\u0011:pW\u0016\u0014X#\u0001\u0015\u0011\u0005%bS\"\u0001\u0016\u000b\u0005-\"\u0011aB2mkN$XM]\u0005\u0003[)\u0012aA\u0011:pW\u0016\u0014\b\u0002C\u0018\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0015\u0002\u0013Q|'I]8lKJ\u0004\u0003\u0002C\u0019\u0001\u0005\u000b\u0007I\u0011\u0001\u001a\u0002\u000bE,X-^3\u0016\u0003M\u00022\u0001N\u001e>\u001b\u0005)$B\u0001\u001c8\u0003)\u0019wN\\2veJ,g\u000e\u001e\u0006\u0003qe\nA!\u001e;jY*\t!(\u0001\u0003kCZ\f\u0017B\u0001\u001f6\u00055\u0011En\\2lS:<\u0017+^3vKB!qB\u0010!G\u0013\ty\u0004C\u0001\u0004UkBdWM\r\t\u0003\u0003\u0012k\u0011A\u0011\u0006\u0003\u0007\u0012\t1!\u00199j\u0013\t)%IA\tSKF,Xm\u001d;PeJ+7\u000f]8og\u0016\u0004BaD$A\u0013&\u0011\u0001\n\u0005\u0002\n\rVt7\r^5p]F\u0002\"a\u0004&\n\u0005-\u0003\"\u0001B+oSRD\u0001\"\u0014\u0001\u0003\u0002\u0003\u0006IaM\u0001\u0007cV,W/\u001a\u0011\t\u0011=\u0003!Q1A\u0005\u0002A\u000bqa\u00195b]:,G.F\u0001R!\t\u0011V+D\u0001T\u0015\t!F!A\u0004oKR<xN]6\n\u0005Y\u001b&a\u0004\"m_\u000e\\\u0017N\\4DQ\u0006tg.\u001a7\t\u0011a\u0003!\u0011!Q\u0001\nE\u000b\u0001b\u00195b]:,G\u000e\t\u0005\u00065\u0002!\taW\u0001\u0007y%t\u0017\u000e\u001e \u0015\rqkfl\u00181b!\t\u0001\u0003\u0001C\u0003\u00163\u0002\u0007q\u0003C\u0003\u001e3\u0002\u0007q\u0004C\u0003'3\u0002\u0007\u0001\u0006C\u000323\u0002\u00071\u0007C\u0003P3\u0002\u0007\u0011\u000bC\u0004d\u0001\t\u0007I\u0011\u00023\u0002\t1|7m[\u000b\u0002KB\u0011a-[\u0007\u0002O*\u0011\u0001.O\u0001\u0005Y\u0006tw-\u0003\u0002kO\n1qJ\u00196fGRDa\u0001\u001c\u0001!\u0002\u0013)\u0017!\u00027pG.\u0004\u0003b\u00028\u0001\u0005\u0004%Ia\\\u0001\u0012gR\fG/Z\"iC:<W\rT8hO\u0016\u0014X#\u00019\u0011\u0005EDX\"\u0001:\u000b\u0005M$\u0018!\u00027pORR'BA;w\u0003\u0019\t\u0007/Y2iK*\tq/A\u0002pe\u001eL!!\u001f:\u0003\r1{wmZ3s\u0011\u0019Y\b\u0001)A\u0005a\u0006\u00112\u000f^1uK\u000eC\u0017M\\4f\u0019><w-\u001a:!\u0011\u0015i\b\u0001\"\u0011\u007f\u0003\u0019!wnV8sWR\t\u0011\nC\u0004\u0002\u0002\u0001!I!a\u0001\u0002\u001f\r|gN\\3diR{'I]8lKJ$R!SA\u0003\u0003\u0013Aa!a\u0002��\u0001\u0004A\u0013A\u00022s_.,'\u000fC\u0003P\u007f\u0002\u0007\u0011\u000b")
/* loaded from: input_file:kafka/controller/RequestSendThread.class */
public class RequestSendThread extends ShutdownableThread implements ScalaObject {
    private final int controllerId;
    private final ControllerContext controllerContext;
    private final Broker toBroker;
    private final BlockingQueue<Tuple2<RequestOrResponse, Function1<RequestOrResponse, Object>>> queue;
    private final BlockingChannel channel;
    private final Object lock;
    private final Logger stateChangeLogger;

    public int controllerId() {
        return this.controllerId;
    }

    public ControllerContext controllerContext() {
        return this.controllerContext;
    }

    public Broker toBroker() {
        return this.toBroker;
    }

    public BlockingQueue<Tuple2<RequestOrResponse, Function1<RequestOrResponse, Object>>> queue() {
        return this.queue;
    }

    public BlockingChannel channel() {
        return this.channel;
    }

    private Object lock() {
        return this.lock;
    }

    private Logger stateChangeLogger() {
        return this.stateChangeLogger;
    }

    /* JADX WARN: Type inference failed for: r0v16, types: [java.lang.Throwable, java.lang.Object] */
    @Override // kafka.utils.ShutdownableThread
    public void doWork() {
        RequestOrResponse readFrom;
        Tuple2<RequestOrResponse, Function1<RequestOrResponse, Object>> take = queue().take();
        RequestOrResponse requestOrResponse = (RequestOrResponse) take._1();
        Function1 function1 = (Function1) take._2();
        try {
            synchronized (lock()) {
                BooleanRef booleanRef = new BooleanRef(false);
                while (isRunning().get() && !booleanRef.elem) {
                    liftedTree1$1(requestOrResponse, booleanRef);
                }
                Receive receive = channel().receive();
                short unboxToShort = BoxesRunTime.unboxToShort(requestOrResponse.requestId().get());
                if (unboxToShort == RequestKeys$.MODULE$.LeaderAndIsrKey()) {
                    readFrom = LeaderAndIsrResponse$.MODULE$.readFrom(receive.buffer());
                } else if (unboxToShort == RequestKeys$.MODULE$.StopReplicaKey()) {
                    readFrom = StopReplicaResponse$.MODULE$.readFrom(receive.buffer());
                } else {
                    if (unboxToShort != RequestKeys$.MODULE$.UpdateMetadataKey()) {
                        throw new MatchError(BoxesRunTime.boxToShort(unboxToShort));
                    }
                    readFrom = UpdateMetadataResponse$.MODULE$.readFrom(receive.buffer());
                }
                stateChangeLogger().trace(Predef$.MODULE$.augmentString("Controller %d epoch %d received response correlationId %d for a request sent to broker %s").format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(controllerId()), BoxesRunTime.boxToInteger(controllerContext().epoch()), BoxesRunTime.boxToInteger(readFrom.copy$default$2()), toBroker().toString()})));
                if (function1 == null) {
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                } else {
                    function1.apply(readFrom);
                }
            }
        } catch (Throwable th) {
            warn(new RequestSendThread$$anonfun$doWork$1(this), new RequestSendThread$$anonfun$doWork$2(this, th));
            channel().disconnect();
        }
    }

    private void connectToBroker(Broker broker, BlockingChannel blockingChannel) {
        try {
            blockingChannel.connect();
            info((Function0<String>) new RequestSendThread$$anonfun$connectToBroker$1(this, broker));
        } catch (Throwable th) {
            blockingChannel.disconnect();
            error(new RequestSendThread$$anonfun$connectToBroker$2(this, broker), new RequestSendThread$$anonfun$connectToBroker$3(this, th));
        }
    }

    private final void liftedTree1$1(RequestOrResponse requestOrResponse, BooleanRef booleanRef) {
        try {
            channel().send(requestOrResponse);
            booleanRef.elem = true;
        } catch (Throwable th) {
            error(new RequestSendThread$$anonfun$liftedTree1$1$2(this, requestOrResponse), new RequestSendThread$$anonfun$liftedTree1$1$3(this, th));
            channel().disconnect();
            connectToBroker(toBroker(), channel());
            booleanRef.elem = false;
            Utils$.MODULE$.swallow(new RequestSendThread$$anonfun$liftedTree1$1$1(this));
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public RequestSendThread(int i, ControllerContext controllerContext, Broker broker, BlockingQueue<Tuple2<RequestOrResponse, Function1<RequestOrResponse, Object>>> blockingQueue, BlockingChannel blockingChannel) {
        super(Predef$.MODULE$.augmentString("Controller-%d-to-broker-%d-send-thread").format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(i), BoxesRunTime.boxToInteger(broker.copy$default$1())})), ShutdownableThread$.MODULE$.init$default$2());
        this.controllerId = i;
        this.controllerContext = controllerContext;
        this.toBroker = broker;
        this.queue = blockingQueue;
        this.channel = blockingChannel;
        this.lock = new Object();
        this.stateChangeLogger = Logger.getLogger(KafkaController$.MODULE$.stateChangeLogger());
        connectToBroker(broker, blockingChannel);
    }
}
