/*
 * Decompiled with CFR 0.152.
 */
package kafka.controller;

import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import kafka.cluster.Broker;
import kafka.cluster.BrokerEndPoint;
import kafka.controller.ControllerBrokerStateInfo;
import kafka.controller.ControllerChannelManager$;
import kafka.controller.ControllerContext;
import kafka.controller.QueueItem;
import kafka.controller.RequestSendThread;
import kafka.server.KafkaConfig;
import kafka.utils.Log4jController$;
import kafka.utils.Logging;
import kafka.utils.Logging$class;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.clients.MetadataUpdater;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.LoginType;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractRequestResponse;
import org.apache.kafka.common.utils.Time;
import org.apache.log4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/*
 * Duplicate member names - consider using --renamedupmembers true
 */
@ScalaSignature(bytes="\u0006\u0001\u0005]e\u0001B\u0001\u0003\u0001\u001d\u0011\u0001dQ8oiJ|G\u000e\\3s\u0007\"\fgN\\3m\u001b\u0006t\u0017mZ3s\u0015\t\u0019A!\u0001\u0006d_:$(o\u001c7mKJT\u0011!B\u0001\u0006W\u000647.Y\u0002\u0001'\r\u0001\u0001B\u0004\t\u0003\u00131i\u0011A\u0003\u0006\u0002\u0017\u0005)1oY1mC&\u0011QB\u0003\u0002\u0007\u0003:L(+\u001a4\u0011\u0005=\u0011R\"\u0001\t\u000b\u0005E!\u0011!B;uS2\u001c\u0018BA\n\u0011\u0005\u001daunZ4j]\u001eD\u0001\"\u0006\u0001\u0003\u0002\u0003\u0006IAF\u0001\u0012G>tGO]8mY\u0016\u00148i\u001c8uKb$\bCA\f\u0019\u001b\u0005\u0011\u0011BA\r\u0003\u0005E\u0019uN\u001c;s_2dWM]\"p]R,\u0007\u0010\u001e\u0005\t7\u0001\u0011\t\u0011)A\u00059\u000511m\u001c8gS\u001e\u0004\"!\b\u0011\u000e\u0003yQ!a\b\u0003\u0002\rM,'O^3s\u0013\t\tcDA\u0006LC\u001a\\\u0017mQ8oM&<\u0007\u0002C\u0012\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0013\u0002\tQLW.\u001a\t\u0003K9j\u0011A\n\u0006\u0003#\u001dR!\u0001K\u0015\u0002\r\r|W.\\8o\u0015\t)!F\u0003\u0002,Y\u00051\u0011\r]1dQ\u0016T\u0011!L\u0001\u0004_J<\u0017BA\u0018'\u0005\u0011!\u0016.\\3\t\u0011E\u0002!\u0011!Q\u0001\nI\nq!\\3ue&\u001c7\u000f\u0005\u00024k5\tAG\u0003\u00022O%\u0011a\u0007\u000e\u0002\b\u001b\u0016$(/[2t\u0011!A\u0004A!A!\u0002\u0013I\u0014\u0001\u0005;ie\u0016\fGMT1nKB\u0013XMZ5y!\rI!\bP\u0005\u0003w)\u0011aa\u00149uS>t\u0007CA\u001fA\u001d\tIa(\u0003\u0002@\u0015\u00051\u0001K]3eK\u001aL!!\u0011\"\u0003\rM#(/\u001b8h\u0015\ty$\u0002C\u0003E\u0001\u0011\u0005Q)\u0001\u0004=S:LGO\u0010\u000b\u0007\r\u001eC\u0015JS&\u0011\u0005]\u0001\u0001\"B\u000bD\u0001\u00041\u0002\"B\u000eD\u0001\u0004a\u0002\"B\u0012D\u0001\u0004!\u0003\"B\u0019D\u0001\u0004\u0011\u0004b\u0002\u001dD!\u0003\u0005\r!\u000f\u0005\b\u001b\u0002\u0011\r\u0011\"\u0005O\u0003=\u0011'o\\6feN#\u0018\r^3J]\u001a|W#A(\u0011\tA+vKW\u0007\u0002#*\u0011!kU\u0001\b[V$\u0018M\u00197f\u0015\t!&\"\u0001\u0006d_2dWm\u0019;j_:L!AV)\u0003\u000f!\u000b7\u000f['baB\u0011\u0011\u0002W\u0005\u00033*\u00111!\u00138u!\t92,\u0003\u0002]\u0005\tI2i\u001c8ue>dG.\u001a:Ce>\\WM]*uCR,\u0017J\u001c4p\u0011\u0019q\u0006\u0001)A\u0005\u001f\u0006\u0001\"M]8lKJ\u001cF/\u0019;f\u0013:4w\u000e\t\u0005\bA\u0002\u0011\r\u0011\"\u0003b\u0003)\u0011'o\\6fe2{7m[\u000b\u0002EB\u00111\r[\u0007\u0002I*\u0011QMZ\u0001\u0005Y\u0006twMC\u0001h\u0003\u0011Q\u0017M^1\n\u0005%$'AB(cU\u0016\u001cG\u000f\u0003\u0004l\u0001\u0001\u0006IAY\u0001\fEJ|7.\u001a:M_\u000e\\\u0007\u0005C\u0003n\u0001\u0011\u0005a.A\u0004ti\u0006\u0014H/\u001e9\u0015\u0003=\u0004\"!\u00039\n\u0005ET!\u0001B+oSRDQa\u001d\u0001\u0005\u00029\f\u0001b\u001d5vi\u0012|wO\u001c\u0005\u0006k\u0002!\tA^\u0001\fg\u0016tGMU3rk\u0016\u001cH\u000fF\u0005pof\f\u0019!a\u0004\u0002 !)\u0001\u0010\u001ea\u0001/\u0006A!M]8lKJLE\rC\u0003{i\u0002\u000710\u0001\u0004ba&\\U-\u001f\t\u0003y~l\u0011! \u0006\u0003}\u001e\n\u0001\u0002\u001d:pi>\u001cw\u000e\\\u0005\u0004\u0003\u0003i(aB!qS.+\u0017p\u001d\u0005\b\u0003\u000b!\b\u0019AA\u0004\u0003)\t\u0007/\u001b,feNLwN\u001c\t\u0005\u0013i\nI\u0001E\u0002\n\u0003\u0017I1!!\u0004\u000b\u0005\u0015\u0019\u0006n\u001c:u\u0011\u001d\t\t\u0002\u001ea\u0001\u0003'\tqA]3rk\u0016\u001cH\u000f\u0005\u0003\u0002\u0016\u0005mQBAA\f\u0015\r\tIbJ\u0001\te\u0016\fX/Z:ug&!\u0011QDA\f\u0005=\t%m\u001d;sC\u000e$(+Z9vKN$\b\"CA\u0011iB\u0005\t\u0019AA\u0012\u0003!\u0019\u0017\r\u001c7cC\u000e\\\u0007CB\u0005\u0002&\u0005%r.C\u0002\u0002()\u0011\u0011BR;oGRLwN\\\u0019\u0011\t\u0005U\u00111F\u0005\u0005\u0003[\t9BA\fBEN$(/Y2u%\u0016\fX/Z:u%\u0016\u001c\bo\u001c8tK\"9\u0011\u0011\u0007\u0001\u0005\u0002\u0005M\u0012!C1eI\n\u0013xn[3s)\ry\u0017Q\u0007\u0005\t\u0003o\ty\u00031\u0001\u0002:\u00051!M]8lKJ\u0004B!a\u000f\u0002B5\u0011\u0011Q\b\u0006\u0004\u0003\u007f!\u0011aB2mkN$XM]\u0005\u0005\u0003\u0007\niD\u0001\u0004Ce>\\WM\u001d\u0005\b\u0003\u000f\u0002A\u0011AA%\u00031\u0011X-\\8wK\n\u0013xn[3s)\ry\u00171\n\u0005\u0007q\u0006\u0015\u0003\u0019A,\t\u000f\u0005=\u0003\u0001\"\u0003\u0002R\u0005a\u0011\r\u001a3OK^\u0014%o\\6feR\u0019q.a\u0015\t\u0011\u0005]\u0012Q\na\u0001\u0003sAq!a\u0016\u0001\t\u0013\tI&\u0001\u000bsK6|g/Z#ySN$\u0018N\\4Ce>\\WM\u001d\u000b\u0004_\u0006m\u0003bBA/\u0003+\u0002\rAW\u0001\fEJ|7.\u001a:Ti\u0006$X\rC\u0004\u0002b\u0001!\t\"a\u0019\u0002-M$\u0018M\u001d;SKF,Xm\u001d;TK:$G\u000b\u001b:fC\u0012$2a\\A3\u0011\u0019A\u0018q\fa\u0001/\"I\u0011\u0011\u000e\u0001\u0012\u0002\u0013\u0005\u00111N\u0001\u0016g\u0016tGMU3rk\u0016\u001cH\u000f\n3fM\u0006,H\u000e\u001e\u00136+\t\tiG\u000b\u0003\u0002$\u0005=4FAA9!\u0011\t\u0019(! \u000e\u0005\u0005U$\u0002BA<\u0003s\n\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0007\u0005m$\"\u0001\u0006b]:|G/\u0019;j_:LA!a \u0002v\t\tRO\\2iK\u000e\\W\r\u001a,be&\fgnY3\b\u0013\u0005\r%!!A\t\u0002\u0005\u0015\u0015\u0001G\"p]R\u0014x\u000e\u001c7fe\u000eC\u0017M\u001c8fY6\u000bg.Y4feB\u0019q#a\"\u0007\u0011\u0005\u0011\u0011\u0011!E\u0001\u0003\u0013\u001b2!a\"\t\u0011\u001d!\u0015q\u0011C\u0001\u0003\u001b#\"!!\"\t\u0015\u0005E\u0015qQI\u0001\n\u0003\t\u0019*A\u000e%Y\u0016\u001c8/\u001b8ji\u0012:'/Z1uKJ$C-\u001a4bk2$H%N\u000b\u0003\u0003+S3!OA8\u0001")
public class ControllerChannelManager
implements Logging {
    private final ControllerContext controllerContext;
    public final KafkaConfig kafka$controller$ControllerChannelManager$$config;
    private final Time time;
    private final Metrics metrics;
    private final Option<String> threadNamePrefix;
    private final HashMap<Object, ControllerBrokerStateInfo> brokerStateInfo;
    private final Object brokerLock;
    private final String loggerName;
    private final Logger logger;
    private String logIdent;
    private final Log4jController$ kafka$utils$Logging$$log4jController;
    private volatile boolean bitmap$0;

    public static Option<String> $lessinit$greater$default$5() {
        return ControllerChannelManager$.MODULE$.$lessinit$greater$default$5();
    }

    @Override
    public String loggerName() {
        return this.loggerName;
    }

    private Logger logger$lzycompute() {
        ControllerChannelManager controllerChannelManager = this;
        synchronized (controllerChannelManager) {
            if (!this.bitmap$0) {
                this.logger = Logging$class.logger(this);
                this.bitmap$0 = true;
            }
            return this.logger;
        }
    }

    @Override
    public Logger logger() {
        return this.bitmap$0 ? this.logger : this.logger$lzycompute();
    }

    @Override
    public String logIdent() {
        return this.logIdent;
    }

    @Override
    public void logIdent_$eq(String x$1) {
        this.logIdent = x$1;
    }

    @Override
    public Log4jController$ kafka$utils$Logging$$log4jController() {
        return this.kafka$utils$Logging$$log4jController;
    }

    @Override
    public void kafka$utils$Logging$_setter_$loggerName_$eq(String x$1) {
        this.loggerName = x$1;
    }

    @Override
    public void kafka$utils$Logging$_setter_$kafka$utils$Logging$$log4jController_$eq(Log4jController$ x$1) {
        this.kafka$utils$Logging$$log4jController = x$1;
    }

    @Override
    public void trace(Function0<String> msg) {
        Logging$class.trace(this, msg);
    }

    @Override
    public Object trace(Function0<Throwable> e) {
        return Logging$class.trace(this, e);
    }

    @Override
    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.trace(this, msg, e);
    }

    @Override
    public void swallowTrace(Function0<BoxedUnit> action) {
        Logging$class.swallowTrace(this, action);
    }

    @Override
    public boolean isDebugEnabled() {
        return Logging$class.isDebugEnabled(this);
    }

    @Override
    public void debug(Function0<String> msg) {
        Logging$class.debug(this, msg);
    }

    @Override
    public Object debug(Function0<Throwable> e) {
        return Logging$class.debug(this, e);
    }

    @Override
    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.debug(this, msg, e);
    }

    @Override
    public void swallowDebug(Function0<BoxedUnit> action) {
        Logging$class.swallowDebug(this, action);
    }

    @Override
    public void info(Function0<String> msg) {
        Logging$class.info(this, msg);
    }

    @Override
    public Object info(Function0<Throwable> e) {
        return Logging$class.info(this, e);
    }

    @Override
    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.info(this, msg, e);
    }

    @Override
    public void swallowInfo(Function0<BoxedUnit> action) {
        Logging$class.swallowInfo(this, action);
    }

    @Override
    public void warn(Function0<String> msg) {
        Logging$class.warn(this, msg);
    }

    @Override
    public Object warn(Function0<Throwable> e) {
        return Logging$class.warn(this, e);
    }

    @Override
    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.warn(this, msg, e);
    }

    @Override
    public void swallowWarn(Function0<BoxedUnit> action) {
        Logging$class.swallowWarn(this, action);
    }

    @Override
    public void swallow(Function0<BoxedUnit> action) {
        Logging$class.swallow(this, action);
    }

    @Override
    public void error(Function0<String> msg) {
        Logging$class.error(this, msg);
    }

    @Override
    public Object error(Function0<Throwable> e) {
        return Logging$class.error(this, e);
    }

    @Override
    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.error(this, msg, e);
    }

    @Override
    public void swallowError(Function0<BoxedUnit> action) {
        Logging$class.swallowError(this, action);
    }

    @Override
    public void fatal(Function0<String> msg) {
        Logging$class.fatal(this, msg);
    }

    @Override
    public Object fatal(Function0<Throwable> e) {
        return Logging$class.fatal(this, e);
    }

    @Override
    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.fatal(this, msg, e);
    }

    public HashMap<Object, ControllerBrokerStateInfo> brokerStateInfo() {
        return this.brokerStateInfo;
    }

    private Object brokerLock() {
        return this.brokerLock;
    }

    public void startup() {
        Object object = this.brokerLock();
        synchronized (object) {
            this.brokerStateInfo().foreach((Function1)new Serializable(this){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ ControllerChannelManager $outer;

                public final void apply(Tuple2<Object, ControllerBrokerStateInfo> brokerState) {
                    this.$outer.startRequestSendThread(brokerState._1$mcI$sp());
                }
                {
                    if ($outer == null) {
                        throw new NullPointerException();
                    }
                    this.$outer = $outer;
                }
            });
            return;
        }
    }

    public void shutdown() {
        Object object = this.brokerLock();
        synchronized (object) {
            this.brokerStateInfo().values().foreach((Function1)new Serializable(this){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ ControllerChannelManager $outer;

                public final void apply(ControllerBrokerStateInfo brokerState) {
                    this.$outer.kafka$controller$ControllerChannelManager$$removeExistingBroker(brokerState);
                }
                {
                    if ($outer == null) {
                        throw new NullPointerException();
                    }
                    this.$outer = $outer;
                }
            });
            return;
        }
    }

    public void sendRequest(int brokerId, ApiKeys apiKey, Option<Object> apiVersion, AbstractRequest request, Function1<AbstractRequestResponse, BoxedUnit> callback) {
        Object object = this.brokerLock();
        synchronized (object) {
            Option option;
            block7: {
                block6: {
                    block5: {
                        Option stateInfoOpt;
                        option = stateInfoOpt = this.brokerStateInfo().get((Object)BoxesRunTime.boxToInteger((int)brokerId));
                        if (!(option instanceof Some)) break block5;
                        Some some = (Some)option;
                        ControllerBrokerStateInfo stateInfo = (ControllerBrokerStateInfo)some.x();
                        stateInfo.messageQueue().put(new QueueItem(apiKey, apiVersion, request, callback));
                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                        break block6;
                    }
                    None$ none$ = None$.MODULE$;
                    Option option2 = option;
                    if (none$ != null ? !none$.equals(option2) : option2 != null) break block7;
                    this.warn((Function0<String>)new Serializable(this, brokerId, request){
                        public static final long serialVersionUID = 0L;
                        private final int brokerId$1;
                        private final AbstractRequest request$1;

                        public final String apply() {
                            return new StringOps(Predef$.MODULE$.augmentString("Not sending request %s to broker %d, since it is offline.")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.request$1, BoxesRunTime.boxToInteger((int)this.brokerId$1)}));
                        }
                        {
                            this.brokerId$1 = brokerId$1;
                            this.request$1 = request$1;
                        }
                    });
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                }
                return;
            }
            throw new MatchError((Object)option);
        }
    }

    public Function1<AbstractRequestResponse, BoxedUnit> sendRequest$default$5() {
        return null;
    }

    public void addBroker(Broker broker) {
        Object object = this.brokerLock();
        synchronized (object) {
            BoxedUnit boxedUnit;
            if (this.brokerStateInfo().contains((Object)BoxesRunTime.boxToInteger((int)broker.id()))) {
                boxedUnit = BoxedUnit.UNIT;
            } else {
                this.kafka$controller$ControllerChannelManager$$addNewBroker(broker);
                this.startRequestSendThread(broker.id());
                boxedUnit = BoxedUnit.UNIT;
            }
            return;
        }
    }

    public void removeBroker(int brokerId) {
        Object object = this.brokerLock();
        synchronized (object) {
            this.kafka$controller$ControllerChannelManager$$removeExistingBroker((ControllerBrokerStateInfo)this.brokerStateInfo().apply((Object)BoxesRunTime.boxToInteger((int)brokerId)));
            return;
        }
    }

    public void kafka$controller$ControllerChannelManager$$addNewBroker(Broker broker) {
        Option<String> option;
        block4: {
            String string;
            NetworkClient networkClient;
            Node brokerNode;
            LinkedBlockingQueue<QueueItem> messageQueue;
            block3: {
                block2: {
                    messageQueue = new LinkedBlockingQueue<QueueItem>();
                    this.debug((Function0<String>)new Serializable(this, broker){
                        public static final long serialVersionUID = 0L;
                        private final /* synthetic */ ControllerChannelManager $outer;
                        private final Broker broker$1;

                        public final String apply() {
                            return new StringOps(Predef$.MODULE$.augmentString("Controller %d trying to connect to broker %d")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.$outer.kafka$controller$ControllerChannelManager$$config.brokerId()), BoxesRunTime.boxToInteger((int)this.broker$1.id())}));
                        }
                        {
                            if ($outer == null) {
                                throw new NullPointerException();
                            }
                            this.$outer = $outer;
                            this.broker$1 = broker$1;
                        }
                    });
                    BrokerEndPoint brokerEndPoint = broker.getBrokerEndPoint(this.kafka$controller$ControllerChannelManager$$config.interBrokerSecurityProtocol());
                    brokerNode = new Node(broker.id(), brokerEndPoint.host(), brokerEndPoint.port());
                    ChannelBuilder channelBuilder = ChannelBuilders.create((SecurityProtocol)this.kafka$controller$ControllerChannelManager$$config.interBrokerSecurityProtocol(), (Mode)Mode.CLIENT, (LoginType)LoginType.SERVER, (java.util.Map)this.kafka$controller$ControllerChannelManager$$config.values(), (String)this.kafka$controller$ControllerChannelManager$$config.saslMechanismInterBrokerProtocol(), (boolean)this.kafka$controller$ControllerChannelManager$$config.saslInterBrokerHandshakeRequestEnable());
                    Selector selector = new Selector(-1, -1L, this.metrics, this.time, "controller-channel", (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc((Object)"broker-id"), (Object)((Object)BoxesRunTime.boxToInteger((int)broker.id())).toString())}))).asJava(), false, channelBuilder);
                    networkClient = new NetworkClient((Selectable)selector, (MetadataUpdater)new ManualMetadataUpdater((List)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Node[]{brokerNode}))).asJava()), ((Object)BoxesRunTime.boxToInteger((int)this.kafka$controller$ControllerChannelManager$$config.brokerId())).toString(), 1, 0L, -1, -1, Predef$.MODULE$.Integer2int(this.kafka$controller$ControllerChannelManager$$config.requestTimeoutMs()), this.time);
                    option = this.threadNamePrefix;
                    None$ none$ = None$.MODULE$;
                    Option<String> option2 = option;
                    if (none$ != null ? !none$.equals(option2) : option2 != null) break block2;
                    string = new StringOps(Predef$.MODULE$.augmentString("Controller-%d-to-broker-%d-send-thread")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.kafka$controller$ControllerChannelManager$$config.brokerId()), BoxesRunTime.boxToInteger((int)broker.id())}));
                    break block3;
                }
                if (!(option instanceof Some)) break block4;
                Some some = (Some)option;
                String name = (String)some.x();
                string = new StringOps(Predef$.MODULE$.augmentString("%s:Controller-%d-to-broker-%d-send-thread")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{name, BoxesRunTime.boxToInteger((int)this.kafka$controller$ControllerChannelManager$$config.brokerId()), BoxesRunTime.boxToInteger((int)broker.id())}));
            }
            String threadName = string;
            RequestSendThread requestThread = new RequestSendThread(this.kafka$controller$ControllerChannelManager$$config.brokerId(), this.controllerContext, messageQueue, networkClient, brokerNode, this.kafka$controller$ControllerChannelManager$$config, this.time, threadName);
            requestThread.setDaemon(false);
            this.brokerStateInfo().put((Object)BoxesRunTime.boxToInteger((int)broker.id()), (Object)new ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue, requestThread));
            return;
        }
        throw new MatchError(option);
    }

    public void kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerBrokerStateInfo brokerState) {
        try {
            brokerState.networkClient().close();
            brokerState.messageQueue().clear();
            brokerState.requestSendThread().shutdown();
            this.brokerStateInfo().remove((Object)BoxesRunTime.boxToInteger((int)brokerState.brokerNode().id()));
        }
        catch (Throwable throwable) {
            this.error((Function0<String>)new Serializable(this){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Error while removing broker by the controller";
                }
            }, (Function0<Throwable>)new Serializable(this, throwable){
                public static final long serialVersionUID = 0L;
                private final Throwable e$1;

                public final Throwable apply() {
                    return this.e$1;
                }
                {
                    this.e$1 = e$1;
                }
            });
        }
    }

    public void startRequestSendThread(int brokerId) {
        RequestSendThread requestThread = ((ControllerBrokerStateInfo)this.brokerStateInfo().apply((Object)BoxesRunTime.boxToInteger((int)brokerId))).requestSendThread();
        Thread.State state = requestThread.getState();
        Thread.State state2 = Thread.State.NEW;
        if (!(state != null ? !((Object)((Object)state)).equals((Object)state2) : state2 != null)) {
            requestThread.start();
        }
    }

    public ControllerChannelManager(ControllerContext controllerContext, KafkaConfig config, Time time, Metrics metrics, Option<String> threadNamePrefix) {
        this.controllerContext = controllerContext;
        this.kafka$controller$ControllerChannelManager$$config = config;
        this.time = time;
        this.metrics = metrics;
        this.threadNamePrefix = threadNamePrefix;
        Logging$class.$init$(this);
        this.brokerStateInfo = new HashMap();
        this.brokerLock = new Object();
        this.logIdent_$eq(new StringBuilder().append((Object)"[Channel manager on controller ").append((Object)BoxesRunTime.boxToInteger((int)config.brokerId())).append((Object)"]: ").toString());
        controllerContext.liveBrokers().foreach((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ ControllerChannelManager $outer;

            public final void apply(Broker x$1) {
                this.$outer.kafka$controller$ControllerChannelManager$$addNewBroker(x$1);
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
            }
        });
    }
}

