package spark.network;

import akka.dispatch.Await$;
import akka.dispatch.ExecutionContext;
import akka.dispatch.ExecutionContext$;
import akka.dispatch.Future;
import akka.dispatch.Promise;
import akka.dispatch.Promise$;
import akka.util.Duration$;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.spi.AbstractSelector;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.ScalaObject;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.Queue;
import scala.collection.mutable.SynchronizedQueue;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.TraitSetter;
import spark.Logging;
import spark.Utils$;

/* compiled from: ConnectionManager.scala */
@ScalaSignature(bytes = "\u0006\u0001\tEh!B\u0001\u0003\u0001\u00111!!E\"p]:,7\r^5p]6\u000bg.Y4fe*\u00111\u0001B\u0001\b]\u0016$xo\u001c:l\u0015\u0005)\u0011!B:qCJ\\7\u0003\u0002\u0001\b\u001fM\u0001\"\u0001C\u0007\u000e\u0003%Q!AC\u0006\u0002\t1\fgn\u001a\u0006\u0002\u0019\u0005!!.\u0019<b\u0013\tq\u0011B\u0001\u0004PE*,7\r\u001e\t\u0003!Ei\u0011\u0001B\u0005\u0003%\u0011\u0011q\u0001T8hO&tw\r\u0005\u0002\u0015/5\tQCC\u0001\u0017\u0003\u0015\u00198-\u00197b\u0013\tARCA\u0006TG\u0006d\u0017m\u00142kK\u000e$\b\u0002\u0003\u000e\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u000f\u0002\tA|'\u000f^\u0002\u0001!\t!R$\u0003\u0002\u001f+\t\u0019\u0011J\u001c;\t\u000b\u0001\u0002A\u0011A\u0011\u0002\rqJg.\u001b;?)\t\u0011C\u0005\u0005\u0002$\u00015\t!\u0001C\u0003\u001b?\u0001\u0007AD\u0002\u0003'\u0001\u00019#!D'fgN\fw-Z*uCR,8oE\u0002&\u000fMA\u0001\"K\u0013\u0003\u0006\u0004%\tAK\u0001\b[\u0016\u001c8/Y4f+\u0005Y\u0003CA\u0012-\u0013\ti#AA\u0004NKN\u001c\u0018mZ3\t\u0011=*#\u0011!Q\u0001\n-\n\u0001\"\\3tg\u0006<W\r\t\u0005\tc\u0015\u0012)\u0019!C\u0001e\u0005\u00192m\u001c8oK\u000e$\u0018n\u001c8NC:\fw-\u001a:JIV\t1\u0007\u0005\u0002$i%\u0011QG\u0001\u0002\u0014\u0007>tg.Z2uS>tW*\u00198bO\u0016\u0014\u0018\n\u001a\u0005\to\u0015\u0012\t\u0011)A\u0005g\u0005!2m\u001c8oK\u000e$\u0018n\u001c8NC:\fw-\u001a:JI\u0002B\u0001\"O\u0013\u0003\u0002\u0003\u0006IAO\u0001\u0012G>l\u0007\u000f\\3uS>t\u0007*\u00198eY\u0016\u0014\b\u0003\u0002\u000b<{}J!\u0001P\u000b\u0003\u0013\u0019+hn\u0019;j_:\f\u0004C\u0001 &\u001b\u0005\u0001\u0001C\u0001\u000bA\u0013\t\tUC\u0001\u0003V]&$\b\"\u0002\u0011&\t\u0003\u0019E\u0003B\u001fE\u000b\u001aCQ!\u000b\"A\u0002-BQ!\r\"A\u0002MBQ!\u000f\"A\u0002iBq\u0001S\u0013A\u0002\u0013\u0005\u0011*\u0001\u0006bG.lUm]:bO\u0016,\u0012A\u0013\t\u0004)-[\u0013B\u0001'\u0016\u0005\u0019y\u0005\u000f^5p]\"9a*\na\u0001\n\u0003y\u0015AD1dW6+7o]1hK~#S-\u001d\u000b\u0003\u007fACq!U'\u0002\u0002\u0003\u0007!*A\u0002yIEBaaU\u0013!B\u0013Q\u0015aC1dW6+7o]1hK\u0002Bq!V\u0013A\u0002\u0013\u0005a+A\u0005biR,W\u000e\u001d;fIV\tq\u000b\u0005\u0002\u00151&\u0011\u0011,\u0006\u0002\b\u0005>|G.Z1o\u0011\u001dYV\u00051A\u0005\u0002q\u000bQ\"\u0019;uK6\u0004H/\u001a3`I\u0015\fHCA ^\u0011\u001d\t&,!AA\u0002]CaaX\u0013!B\u00139\u0016AC1ui\u0016l\u0007\u000f^3eA!9\u0011-\na\u0001\n\u00031\u0016!B1dW\u0016$\u0007bB2&\u0001\u0004%\t\u0001Z\u0001\nC\u000e\\W\rZ0%KF$\"aP3\t\u000fE\u0013\u0017\u0011!a\u0001/\"1q-\nQ!\n]\u000ba!Y2lK\u0012\u0004\u0003\"B5&\t\u0003Q\u0017\u0001C7be.$uN\\3\u0015\u0003}Bq\u0001\u001c\u0001C\u0002\u0013\u0005Q.\u0001\u0005tK2,7\r^8s+\u0005q\u0007CA8w\u001b\u0005\u0001(BA9s\u0003\r\u0019\b/\u001b\u0006\u0003gR\f\u0001b\u00195b]:,Gn\u001d\u0006\u0003k.\t1A\\5p\u0013\t9\bO\u0001\tBEN$(/Y2u'\u0016dWm\u0019;pe\"1\u0011\u0010\u0001Q\u0001\n9\f\u0011b]3mK\u000e$xN\u001d\u0011\t\u000fm\u0004!\u0019!C\u0001y\u0006)\u0002.\u00198eY\u0016lUm]:bO\u0016,\u00050Z2vi>\u0014X#A?\u0011\u0007y\f9!D\u0001��\u0015\u0011\t\t!a\u0001\u0002\u0015\r|gnY;se\u0016tGOC\u0002\u0002\u0006-\tA!\u001e;jY&\u0019\u0011\u0011B@\u0003\u001f\u0015CXmY;u_J\u001cVM\u001d<jG\u0016Dq!!\u0004\u0001A\u0003%Q0\u0001\fiC:$G.Z'fgN\fw-Z#yK\u000e,Ho\u001c:!\u0011%\t\t\u0002\u0001b\u0001\n\u0003\t\u0019\"A\u0007tKJ4XM]\"iC:tW\r\\\u000b\u0003\u0003+\u0001B!a\u0006\u0002\u001a5\t!/C\u0002\u0002\u001cI\u00141cU3sm\u0016\u00148k\\2lKR\u001c\u0005.\u00198oK2D\u0001\"a\b\u0001A\u0003%\u0011QC\u0001\u000fg\u0016\u0014h/\u001a:DQ\u0006tg.\u001a7!\u0011%\t\u0019\u0003\u0001b\u0001\n\u0003\t)#\u0001\td_:tWm\u0019;j_:\u001c()_&fsV\u0011\u0011q\u0005\n\u0007\u0003S\t\t$!\u0014\u0007\u000f\u0005-\u0012Q\u0006\u0001\u0002(\taAH]3gS:,W.\u001a8u}!A\u0011q\u0006\u0001!\u0002\u0013\t9#A\td_:tWm\u0019;j_:\u001c()_&fs\u0002\u0002\u0002\"a\r\u0002>\u0005\u0005\u0013qI\u0007\u0003\u0003kQA!a\u000e\u0002:\u00059Q.\u001e;bE2,'bAA\u001e+\u0005Q1m\u001c7mK\u000e$\u0018n\u001c8\n\t\u0005}\u0012Q\u0007\u0002\b\u0011\u0006\u001c\b.T1q!\u0011\t9\"a\u0011\n\u0007\u0005\u0015#O\u0001\u0007TK2,7\r^5p].+\u0017\u0010E\u0002$\u0003\u0013J1!a\u0013\u0003\u0005)\u0019uN\u001c8fGRLwN\u001c\t\t\u0003g\ty%!\u0011\u0002H%!\u0011\u0011KA\u001b\u0005=\u0019\u0016P\\2ie>t\u0017N_3e\u001b\u0006\u0004\b\"CA+\u0001\t\u0007I\u0011AA,\u0003=\u0019wN\u001c8fGRLwN\\:Cs&#WCAA-%\u0019\tY&!\u0019\u0002j\u00199\u00111FA/\u0001\u0005e\u0003\u0002CA0\u0001\u0001\u0006I!!\u0017\u0002!\r|gN\\3di&|gn\u001d\"z\u0013\u0012\u0004\u0003cBA\u001a\u0003{\u0019\u00141\r\t\u0004G\u0005\u0015\u0014bAA4\u0005\t\t2+\u001a8eS:<7i\u001c8oK\u000e$\u0018n\u001c8\u0011\u000f\u0005M\u0012qJ\u001a\u0002d!I\u0011Q\u000e\u0001C\u0002\u0013\u0005\u0011qN\u0001\u0010[\u0016\u001c8/Y4f'R\fG/^:fgV\u0011\u0011\u0011\u000f\t\u0007\u0003g\ti\u0004H\u001f\t\u0011\u0005U\u0004\u0001)A\u0005\u0003c\n\u0001#\\3tg\u0006<Wm\u0015;biV\u001cXm\u001d\u0011\t\u0013\u0005e\u0004A1A\u0005\u0002\u0005m\u0014AE2p]:,7\r^5p]J+\u0017/^3tiN,\"!! \u0013\r\u0005}\u0014\u0011MA5\r\u001d\tY#!!\u0001\u0003{B\u0001\"a!\u0001A\u0003%\u0011QP\u0001\u0014G>tg.Z2uS>t'+Z9vKN$8\u000f\t\u0005\n\u0003\u000f\u0003!\u0019!C\u0001\u0003\u0013\u000b\u0011d[3z\u0013:$XM]3ti\u000eC\u0017M\\4f%\u0016\fX/Z:ugV\u0011\u00111\u0012\t\u0007\u0003g\ti)!%\n\t\u0005=\u0015Q\u0007\u0002\u0012'ft7\r\u001b:p]&TX\rZ)vKV,\u0007C\u0002\u000b\u0002\u0014\u0006\u0005C$C\u0002\u0002\u0016V\u0011a\u0001V;qY\u0016\u0014\u0004\u0002CAM\u0001\u0001\u0006I!a#\u00025-,\u00170\u00138uKJ,7\u000f^\"iC:<WMU3rk\u0016\u001cHo\u001d\u0011\t\u0013\u0005u\u0005A1A\u0005\u0002\u0005}\u0015aE:f]\u0012lUm]:bO\u0016\u0014V-];fgR\u001cXCAAQ!\u0019\t\u0019$a)\u0002(&!\u0011QUA\u001b\u0005\u0015\tV/Z;f!\u0019!\u00121S\u0016\u0002d!A\u00111\u0016\u0001!\u0002\u0013\t\t+\u0001\u000btK:$W*Z:tC\u001e,'+Z9vKN$8\u000f\t\u0005\n\u0003_\u0003!\u0019!C\u0002\u0003c\u000b\u0011CZ;ukJ,W\t_3d\u0007>tG/\u001a=u+\t\t\u0019L\u0005\u0004\u00026\u0006\r\u00171\u001a\u0004\u0007\u0003W\u0001\u0001!a-\u000b\t\u0005e\u00161X\u0001\u0011\u000bb,7-\u001e;j_:\u001cuN\u001c;fqRTA!!0\u0002@\u0006AA-[:qCR\u001c\u0007N\u0003\u0002\u0002B\u0006!\u0011m[6b!\u0011\t)-a2\u000e\u0005\u0005m\u0016\u0002BAe\u0003w\u0013\u0001#\u0012=fGV$\u0018n\u001c8D_:$X\r\u001f;\u0011\u0007y\fi-C\u0002\u0002P~\u0014\u0001\"\u0012=fGV$xN\u001d\u0005\t\u0003'\u0004\u0001\u0015!\u0003\u00024\u0006\u0011b-\u001e;ve\u0016,\u00050Z2D_:$X\r\u001f;!\u0011%\t9\u000e\u0001a\u0001\n\u0003\tI.A\tp]J+7-Z5wK\u000e\u000bG\u000e\u001c2bG.,\"!a7\u0011\u000fQ\ti.!94\u0015&\u0019\u0011q\\\u000b\u0003\u0013\u0019+hn\u0019;j_:\u0014\u0004cA\u0012\u0002d&\u0019\u0011Q\u001d\u0002\u0003\u001b\t+hMZ3s\u001b\u0016\u001c8/Y4f\u0011%\tI\u000f\u0001a\u0001\n\u0003\tY/A\u000bp]J+7-Z5wK\u000e\u000bG\u000e\u001c2bG.|F%Z9\u0015\u0007}\ni\u000fC\u0005R\u0003O\f\t\u00111\u0001\u0002\\\"A\u0011\u0011\u001f\u0001!B\u0013\tY.\u0001\np]J+7-Z5wK\u000e\u000bG\u000e\u001c2bG.\u0004\u0003\u0002CA{\u0001\t\u0007I\u0011\u0001\u001a\u0002\u0005%$\u0007bBA}\u0001\u0001\u0006IaM\u0001\u0004S\u0012\u0004\u0003\"CA\u007f\u0001\t\u0007I\u0011AA��\u00039\u0019X\r\\3di>\u0014H\u000b\u001b:fC\u0012,\"A!\u0001\u0011\u0007!\u0011\u0019!C\u0002\u0003\u0006%\u0011a\u0001\u00165sK\u0006$\u0007\u0002\u0003B\u0005\u0001\u0001\u0006IA!\u0001\u0002\u001fM,G.Z2u_J$\u0006N]3bI\u0002BaA!\u0004\u0001\t\u0013Q\u0017a\u0001:v]\"9!\u0011\u0003\u0001\u0005\n\tM\u0011\u0001E1dG\u0016\u0004HoQ8o]\u0016\u001cG/[8o)\ry$Q\u0003\u0005\t\u0005/\u0011y\u00011\u0001\u0002B\u0005\u00191.Z=\t\u000f\tm\u0001\u0001\"\u0003\u0003\u001e\u0005i\u0011\r\u001a3D_:tWm\u0019;j_:$2a\u0010B\u0010\u0011!\u0011\tC!\u0007A\u0002\u0005\u001d\u0013AC2p]:,7\r^5p]\"9!Q\u0005\u0001\u0005\n\t\u001d\u0012\u0001\u0005:f[>4XmQ8o]\u0016\u001cG/[8o)\ry$\u0011\u0006\u0005\t\u0005C\u0011\u0019\u00031\u0001\u0002H!9!Q\u0006\u0001\u0005\n\t=\u0012!\u00065b]\u0012dWmQ8o]\u0016\u001cG/[8o\u000bJ\u0014xN\u001d\u000b\u0006\u007f\tE\"1\u0007\u0005\t\u0005C\u0011Y\u00031\u0001\u0002H!A!Q\u0007B\u0016\u0001\u0004\u00119$A\u0001f!\u0011\u0011ID!\u0013\u000f\t\tm\"Q\t\b\u0005\u0005{\u0011\u0019%\u0004\u0002\u0003@)\u0019!\u0011I\u000e\u0002\rq\u0012xn\u001c;?\u0013\u00051\u0012b\u0001B$+\u00059\u0001/Y2lC\u001e,\u0017\u0002\u0002B&\u0005\u001b\u0012\u0011\"\u0012=dKB$\u0018n\u001c8\u000b\u0007\t\u001dS\u0003C\u0004\u0003R\u0001!IAa\u0015\u00027\rD\u0017M\\4f\u0007>tg.Z2uS>t7*Z=J]R,'/Z:u)\u0015y$Q\u000bB,\u0011!\u0011\tCa\u0014A\u0002\u0005\u001d\u0003b\u0002B-\u0005\u001f\u0002\r\u0001H\u0001\u0004_B\u001c\bb\u0002B/\u0001\u0011%!qL\u0001\u000fe\u0016\u001cW-\u001b<f\u001b\u0016\u001c8/Y4f)\u0015y$\u0011\rB2\u0011!\u0011\tCa\u0017A\u0002\u0005\u001d\u0003BB\u0015\u0003\\\u0001\u00071\u0006C\u0004\u0003h\u0001!IA!\u001b\u0002\u001b!\fg\u000e\u001a7f\u001b\u0016\u001c8/Y4f)\u0015y$1\u000eB7\u0011\u0019\t$Q\ra\u0001g!1\u0011F!\u001aA\u0002-BqA!\u001d\u0001\t\u0013\u0011\u0019(A\u0006tK:$W*Z:tC\u001e,G#B \u0003v\t]\u0004BB\u0019\u0003p\u0001\u00071\u0007\u0003\u0004*\u0005_\u0002\ra\u000b\u0005\b\u0005w\u0002A\u0011\u0001B?\u0003M\u0019XM\u001c3NKN\u001c\u0018mZ3SK2L\u0017M\u00197z)\u0019\u0011yH!\"\u0003\bB)\u0011Q\u0019BA\u0015&!!1QA^\u0005\u00191U\u000f^;sK\"1\u0011G!\u001fA\u0002MBa!\u000bB=\u0001\u0004Y\u0003b\u0002BF\u0001\u0011\u0005!QR\u0001\u0018g\u0016tG-T3tg\u0006<WMU3mS\u0006\u0014G._*z]\u000e$RA\u0013BH\u0005#Ca!\rBE\u0001\u0004\u0019\u0004BB\u0015\u0003\n\u0002\u00071\u0006C\u0004\u0003\u0016\u0002!\tAa&\u0002!=t'+Z2fSZ,W*Z:tC\u001e,GcA \u0003\u001a\"A!1\u0014BJ\u0001\u0004\u0011i*\u0001\u0005dC2d'-Y2l!\u0019!\u0012Q\\\u00164\u0015\"1!\u0011\u0015\u0001\u0005\u0002)\fAa\u001d;pa\u001eA!Q\u0015\u0002\t\u0006\u0011\u00119+A\tD_:tWm\u0019;j_:l\u0015M\\1hKJ\u00042a\tBU\r\u001d\t!\u0001#\u0002\u0005\u0005W\u001bBA!+\b'!9\u0001E!+\u0005\u0002\t=FC\u0001BT\u0011!\u0011\u0019L!+\u0005\u0002\tU\u0016\u0001B7bS:$2a\u0010B\\\u0011!\u0011IL!-A\u0002\tm\u0016\u0001B1sON\u0004R\u0001\u0006B_\u0005\u0003L1Aa0\u0016\u0005\u0015\t%O]1z!\u0011\u0011\u0019M!3\u000f\u0007Q\u0011)-C\u0002\u0003HV\ta\u0001\u0015:fI\u00164\u0017\u0002\u0002Bf\u0005\u001b\u0014aa\u0015;sS:<'b\u0001Bd+!A!\u0011\u001bBU\t\u0003\u0011\u0019.A\u000buKN$8+Z9vK:$\u0018.\u00197TK:$\u0017N\\4\u0015\u0007}\u0012)\u000eC\u0004\u0003X\n=\u0007\u0019\u0001\u0012\u0002\u000f5\fg.Y4fe\"A!1\u001cBU\t\u0003\u0011i.A\nuKN$\b+\u0019:bY2,GnU3oI&tw\rF\u0002@\u0005?DqAa6\u0003Z\u0002\u0007!\u0005\u0003\u0005\u0003d\n%F\u0011\u0001Bs\u0003u!Xm\u001d;QCJ\fG\u000e\\3m\t\u0016\u001c'/Z1tS:<7+\u001a8eS:<GcA \u0003h\"9!q\u001bBq\u0001\u0004\u0011\u0003\u0002\u0003Bv\u0005S#\tA!<\u0002+Q,7\u000f^\"p]RLg.^8vgN+g\u000eZ5oOR\u0019qHa<\t\u000f\t]'\u0011\u001ea\u0001E\u0001")
/* loaded from: input_file:spark/network/ConnectionManager.class */
public class ConnectionManager implements Logging {
    private final AbstractSelector selector;
    private final ExecutorService handleMessageExecutor;
    private final ServerSocketChannel serverChannel;
    private final HashMap<SelectionKey, Connection> connectionsByKey;
    private final HashMap<ConnectionManagerId, SendingConnection> connectionsById;
    private final HashMap<Object, MessageStatus> messageStatuses;
    private final HashMap<ConnectionManagerId, SendingConnection> connectionRequests;
    private final SynchronizedQueue<Tuple2<SelectionKey, Object>> keyInterestChangeRequests;
    private final Queue<Tuple2<Message, SendingConnection>> sendMessageRequests;
    private final ExecutionContext futureExecContext;
    private Function2<BufferMessage, ConnectionManagerId, Option<Message>> onReceiveCallback;
    private final ConnectionManagerId id;
    private final Thread selectorThread;
    private transient Logger spark$Logging$$log_;

    /* compiled from: ConnectionManager.scala */
    /* loaded from: input_file:spark/network/ConnectionManager$MessageStatus.class */
    public class MessageStatus implements ScalaObject {
        private final Message message;
        private final ConnectionManagerId connectionManagerId;
        private final Function1<MessageStatus, BoxedUnit> completionHandler;
        private Option<Message> ackMessage;
        private boolean attempted;
        private boolean acked;
        public final ConnectionManager $outer;

        public Message message() {
            return this.message;
        }

        public ConnectionManagerId connectionManagerId() {
            return this.connectionManagerId;
        }

        public Option<Message> ackMessage() {
            return this.ackMessage;
        }

        public void ackMessage_$eq(Option<Message> option) {
            this.ackMessage = option;
        }

        public boolean attempted() {
            return this.attempted;
        }

        public void attempted_$eq(boolean z) {
            this.attempted = z;
        }

        public boolean acked() {
            return this.acked;
        }

        public void acked_$eq(boolean z) {
            this.acked = z;
        }

        public void markDone() {
            this.completionHandler.apply(this);
        }

        public ConnectionManager spark$network$ConnectionManager$MessageStatus$$$outer() {
            return this.$outer;
        }

        public MessageStatus(ConnectionManager connectionManager, Message message, ConnectionManagerId connectionManagerId, Function1<MessageStatus, BoxedUnit> function1) {
            this.message = message;
            this.connectionManagerId = connectionManagerId;
            this.completionHandler = function1;
            if (connectionManager == null) {
                throw new NullPointerException();
            }
            this.$outer = connectionManager;
            this.ackMessage = None$.MODULE$;
            this.attempted = false;
            this.acked = false;
        }
    }

    public static final void testContinuousSending(ConnectionManager connectionManager) {
        ConnectionManager$.MODULE$.testContinuousSending(connectionManager);
    }

    public static final void testParallelDecreasingSending(ConnectionManager connectionManager) {
        ConnectionManager$.MODULE$.testParallelDecreasingSending(connectionManager);
    }

    public static final void testParallelSending(ConnectionManager connectionManager) {
        ConnectionManager$.MODULE$.testParallelSending(connectionManager);
    }

    public static final void testSequentialSending(ConnectionManager connectionManager) {
        ConnectionManager$.MODULE$.testSequentialSending(connectionManager);
    }

    public static final void main(String[] strArr) {
        ConnectionManager$.MODULE$.main(strArr);
    }

    @Override // spark.Logging
    public final Logger spark$Logging$$log_() {
        return this.spark$Logging$$log_;
    }

    @Override // spark.Logging
    @TraitSetter
    public final void spark$Logging$$log__$eq(Logger logger) {
        this.spark$Logging$$log_ = logger;
    }

    @Override // spark.Logging
    public Logger log() {
        return Logging.Cclass.log(this);
    }

    @Override // spark.Logging
    public void logInfo(Function0<String> function0) {
        Logging.Cclass.logInfo(this, function0);
    }

    @Override // spark.Logging
    public void logDebug(Function0<String> function0) {
        Logging.Cclass.logDebug(this, function0);
    }

    @Override // spark.Logging
    public void logTrace(Function0<String> function0) {
        Logging.Cclass.logTrace(this, function0);
    }

    @Override // spark.Logging
    public void logWarning(Function0<String> function0) {
        Logging.Cclass.logWarning(this, function0);
    }

    @Override // spark.Logging
    public void logError(Function0<String> function0) {
        Logging.Cclass.logError(this, function0);
    }

    @Override // spark.Logging
    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.Cclass.logInfo(this, function0, th);
    }

    @Override // spark.Logging
    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.Cclass.logDebug(this, function0, th);
    }

    @Override // spark.Logging
    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.Cclass.logTrace(this, function0, th);
    }

    @Override // spark.Logging
    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.Cclass.logWarning(this, function0, th);
    }

    @Override // spark.Logging
    public void logError(Function0<String> function0, Throwable th) {
        Logging.Cclass.logError(this, function0, th);
    }

    @Override // spark.Logging
    public void initLogging() {
        Logging.Cclass.initLogging(this);
    }

    public AbstractSelector selector() {
        return this.selector;
    }

    public ExecutorService handleMessageExecutor() {
        return this.handleMessageExecutor;
    }

    public ServerSocketChannel serverChannel() {
        return this.serverChannel;
    }

    public HashMap<SelectionKey, Connection> connectionsByKey() {
        return this.connectionsByKey;
    }

    public HashMap<ConnectionManagerId, SendingConnection> connectionsById() {
        return this.connectionsById;
    }

    public HashMap<Object, MessageStatus> messageStatuses() {
        return this.messageStatuses;
    }

    public HashMap<ConnectionManagerId, SendingConnection> connectionRequests() {
        return this.connectionRequests;
    }

    public SynchronizedQueue<Tuple2<SelectionKey, Object>> keyInterestChangeRequests() {
        return this.keyInterestChangeRequests;
    }

    public Queue<Tuple2<Message, SendingConnection>> sendMessageRequests() {
        return this.sendMessageRequests;
    }

    public ExecutionContext futureExecContext() {
        return this.futureExecContext;
    }

    public Function2<BufferMessage, ConnectionManagerId, Option<Message>> onReceiveCallback() {
        return this.onReceiveCallback;
    }

    public void onReceiveCallback_$eq(Function2<BufferMessage, ConnectionManagerId, Option<Message>> function2) {
        this.onReceiveCallback = function2;
    }

    public ConnectionManagerId id() {
        return this.id;
    }

    public Thread selectorThread() {
        return this.selectorThread;
    }

    public final void spark$network$ConnectionManager$$run() {
        while (!selectorThread().isInterrupted()) {
            try {
                ((HashMap) connectionRequests().filter(new ConnectionManager$$anonfun$spark$network$ConnectionManager$$run$1(this))).foreach(new ConnectionManager$$anonfun$spark$network$ConnectionManager$$run$2(this));
                MatchError sendMessageRequests = sendMessageRequests();
                synchronized (sendMessageRequests) {
                    while (!sendMessageRequests().isEmpty()) {
                        Tuple2 tuple2 = (Tuple2) sendMessageRequests().dequeue();
                        if (tuple2 == null) {
                            sendMessageRequests = new MatchError(tuple2);
                            throw sendMessageRequests;
                        }
                        Tuple2 tuple22 = new Tuple2(tuple2._1(), tuple2._2());
                        Message message = (Message) tuple22._1();
                        sendMessageRequests = (SendingConnection) tuple22._2();
                        sendMessageRequests.send(message);
                    }
                }
                while (!keyInterestChangeRequests().isEmpty()) {
                    Tuple2 tuple23 = (Tuple2) keyInterestChangeRequests().dequeue();
                    if (tuple23 == null) {
                        throw new MatchError(tuple23);
                    }
                    Tuple2 tuple24 = new Tuple2(tuple23._1(), tuple23._2());
                    SelectionKey selectionKey = (SelectionKey) tuple24._1();
                    int _2$mcI$sp = tuple24._2$mcI$sp();
                    Connection connection = (Connection) connectionsByKey().apply(selectionKey);
                    int interestOps = selectionKey.interestOps();
                    selectionKey.interestOps(_2$mcI$sp);
                    logTrace(new ConnectionManager$$anonfun$spark$network$ConnectionManager$$run$3(this, _2$mcI$sp, connection, interestOps));
                }
                int select = selector().select();
                if (select == 0) {
                    logDebug(new ConnectionManager$$anonfun$spark$network$ConnectionManager$$run$4(this, select));
                }
                if (selectorThread().isInterrupted()) {
                    logInfo(new ConnectionManager$$anonfun$spark$network$ConnectionManager$$run$5(this));
                    return;
                }
                Iterator<SelectionKey> it = selector().selectedKeys().iterator();
                while (it.hasNext()) {
                    SelectionKey next = it.next();
                    it.remove();
                    if (next.isValid()) {
                        if (next.isAcceptable()) {
                            acceptConnection(next);
                        } else if (next.isConnectable()) {
                            ((SendingConnection) connectionsByKey().apply(next)).finishConnect();
                        } else if (next.isReadable()) {
                            ((Connection) connectionsByKey().apply(next)).read();
                        } else if (next.isWritable()) {
                            ((Connection) connectionsByKey().apply(next)).write();
                        }
                    }
                }
            } catch (Exception e) {
                logError(new ConnectionManager$$anonfun$spark$network$ConnectionManager$$run$6(this), e);
                return;
            }
        }
    }

    private void acceptConnection(SelectionKey selectionKey) {
        ReceivingConnection receivingConnection = new ReceivingConnection(((ServerSocketChannel) selectionKey.channel()).accept(), selector());
        receivingConnection.onReceive(new ConnectionManager$$anonfun$acceptConnection$1(this));
        receivingConnection.onClose(new ConnectionManager$$anonfun$acceptConnection$2(this));
        spark$network$ConnectionManager$$addConnection(receivingConnection);
        logInfo(new ConnectionManager$$anonfun$acceptConnection$3(this, receivingConnection));
    }

    public final void spark$network$ConnectionManager$$addConnection(Connection connection) {
        connectionsByKey().$plus$eq(new Tuple2(connection.key(), connection));
        if (connection instanceof SendingConnection) {
            SendingConnection sendingConnection = (SendingConnection) connection;
            connectionsById().$plus$eq(new Tuple2(sendingConnection.remoteConnectionManagerId(), sendingConnection));
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        connection.onKeyInterestChange(new ConnectionManager$$anonfun$spark$network$ConnectionManager$$addConnection$1(this));
        connection.onException(new ConnectionManager$$anonfun$spark$network$ConnectionManager$$addConnection$2(this));
        connection.onClose(new ConnectionManager$$anonfun$spark$network$ConnectionManager$$addConnection$3(this));
    }

    public final void spark$network$ConnectionManager$$removeConnection(Connection connection) {
        connectionsByKey().$minus$eq(connection.key());
        if (connection instanceof SendingConnection) {
            ConnectionManagerId remoteConnectionManagerId = ((SendingConnection) connection).remoteConnectionManagerId();
            logInfo(new ConnectionManager$$anonfun$spark$network$ConnectionManager$$removeConnection$1(this, remoteConnectionManagerId));
            connectionsById().$minus$eq(remoteConnectionManagerId);
            Throwable messageStatuses = messageStatuses();
            synchronized (messageStatuses) {
                ((IterableLike) messageStatuses().values().filter(new ConnectionManager$$anonfun$spark$network$ConnectionManager$$removeConnection$2(this, remoteConnectionManagerId))).foreach(new ConnectionManager$$anonfun$spark$network$ConnectionManager$$removeConnection$3(this));
                messageStatuses().retain(new ConnectionManager$$anonfun$spark$network$ConnectionManager$$removeConnection$4(this, remoteConnectionManagerId));
                messageStatuses = messageStatuses;
                return;
            }
        }
        if (connection instanceof ReceivingConnection) {
            ConnectionManagerId remoteConnectionManagerId2 = ((ReceivingConnection) connection).remoteConnectionManagerId();
            logInfo(new ConnectionManager$$anonfun$spark$network$ConnectionManager$$removeConnection$5(this, remoteConnectionManagerId2));
            Option find = connectionsById().keys().find(new ConnectionManager$$anonfun$3(this, remoteConnectionManagerId2));
            ConnectionManagerId connectionManagerId = (ConnectionManagerId) (!find.isEmpty() ? find.get() : new Option$.anonfun.orNull.1(find, Predef$.MODULE$.conforms()).apply());
            if (connectionManagerId == null) {
                logError(new ConnectionManager$$anonfun$spark$network$ConnectionManager$$removeConnection$6(this));
                return;
            }
            logInfo(new ConnectionManager$$anonfun$spark$network$ConnectionManager$$removeConnection$7(this, connectionManagerId));
            ((SendingConnection) connectionsById().apply(connectionManagerId)).close();
            connectionsById().$minus$eq(connectionManagerId);
            Throwable messageStatuses2 = messageStatuses();
            synchronized (messageStatuses2) {
                messageStatuses().values().withFilter(new ConnectionManager$$anonfun$spark$network$ConnectionManager$$removeConnection$8(this, connectionManagerId)).foreach(new ConnectionManager$$anonfun$spark$network$ConnectionManager$$removeConnection$9(this));
                messageStatuses().retain(new ConnectionManager$$anonfun$spark$network$ConnectionManager$$removeConnection$10(this, connectionManagerId));
                messageStatuses2 = messageStatuses2;
            }
        }
    }

    public final void spark$network$ConnectionManager$$handleConnectionError(Connection connection, Exception exc) {
        logInfo(new ConnectionManager$$anonfun$spark$network$ConnectionManager$$handleConnectionError$1(this, connection));
        spark$network$ConnectionManager$$removeConnection(connection);
    }

    public final void spark$network$ConnectionManager$$changeConnectionKeyInterest(Connection connection, int i) {
        keyInterestChangeRequests().$plus$eq(new Tuple2(connection.key(), BoxesRunTime.boxToInteger(i)));
    }

    public final void spark$network$ConnectionManager$$receiveMessage(Connection connection, Message message) {
        ConnectionManagerId fromSocketAddress = ConnectionManagerId$.MODULE$.fromSocketAddress(message.senderAddress());
        logDebug(new ConnectionManager$$anonfun$spark$network$ConnectionManager$$receiveMessage$1(this, message, fromSocketAddress));
        handleMessageExecutor().execute(new ConnectionManager$$anon$5(this, message, fromSocketAddress));
    }

    /* JADX WARN: Code restructure failed: missing block: B:32:0x00b3, code lost:
    
        if (r0.equals(r0) != false) goto L26;
     */
    /* JADX WARN: Type inference failed for: r0v41, types: [java.lang.Throwable, boolean] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public final void spark$network$ConnectionManager$$handleMessage(spark.network.ConnectionManagerId r8, spark.network.Message r9) {
        /*
            Method dump skipped, instructions count: 445
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: spark.network.ConnectionManager.spark$network$ConnectionManager$$handleMessage(spark.network.ConnectionManagerId, spark.network.Message):void");
    }

    private void sendMessage(ConnectionManagerId connectionManagerId, Message message) {
        SendingConnection sendingConnection = (SendingConnection) connectionsById().getOrElse(ConnectionManagerId$.MODULE$.fromSocketAddress(connectionManagerId.toSocketAddress()), new ConnectionManager$$anonfun$7(this, connectionManagerId));
        message.senderAddress_$eq(id().toSocketAddress());
        logDebug(new ConnectionManager$$anonfun$sendMessage$1(this, connectionManagerId, message));
        Throwable sendMessageRequests = sendMessageRequests();
        synchronized (sendMessageRequests) {
            sendMessageRequests().$plus$eq(new Tuple2(message, sendingConnection));
            sendMessageRequests = sendMessageRequests;
            selector().wakeup();
        }
    }

    public Future<Option<Message>> sendMessageReliably(ConnectionManagerId connectionManagerId, Message message) {
        Promise apply = Promise$.MODULE$.apply(futureExecContext());
        MessageStatus messageStatus = new MessageStatus(this, message, connectionManagerId, new ConnectionManager$$anonfun$8(this, apply));
        Throwable messageStatuses = messageStatuses();
        synchronized (messageStatuses) {
            messageStatuses().$plus$eq(new Tuple2(BoxesRunTime.boxToInteger(message.id()), messageStatus));
            messageStatuses = messageStatuses;
            sendMessage(connectionManagerId, message);
            return apply.future();
        }
    }

    public Option<Message> sendMessageReliablySync(ConnectionManagerId connectionManagerId, Message message) {
        return (Option) Await$.MODULE$.result(sendMessageReliably(connectionManagerId, message), Duration$.MODULE$.Inf());
    }

    public void onReceiveMessage(Function2<Message, ConnectionManagerId, Option<Message>> function2) {
        onReceiveCallback_$eq(function2);
    }

    public void stop() {
        selectorThread().interrupt();
        selectorThread().join();
        selector().close();
        connectionsByKey().values().foreach(new ConnectionManager$$anonfun$stop$1(this));
        if (connectionsByKey().size() != 0) {
            logWarning(new ConnectionManager$$anonfun$stop$2(this));
        }
        handleMessageExecutor().shutdown();
        logInfo(new ConnectionManager$$anonfun$stop$3(this));
    }

    public final String intToOpStr$1(int i) {
        ArrayBuffer apply = ArrayBuffer$.MODULE$.apply(Nil$.MODULE$);
        if ((i & 1) != 0) {
            apply.$plus$eq("READ");
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        if ((i & 4) != 0) {
            apply.$plus$eq("WRITE");
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        if ((i & 8) != 0) {
            apply.$plus$eq("CONNECT");
        } else {
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        }
        if ((i & 16) != 0) {
            apply.$plus$eq("ACCEPT");
        } else {
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
        }
        return apply.size() > 0 ? (String) apply.reduceLeft(new ConnectionManager$$anonfun$intToOpStr$1$1(this)) : " ";
    }

    public final SendingConnection startNewConnection$1(ConnectionManagerId connectionManagerId) {
        return (SendingConnection) connectionRequests().getOrElseUpdate(connectionManagerId, new ConnectionManager$$anonfun$6(this, connectionManagerId, new InetSocketAddress(connectionManagerId.host(), connectionManagerId.port())));
    }

    public ConnectionManager(int i) {
        spark$Logging$$log__$eq(null);
        this.selector = SelectorProvider.provider().openSelector();
        this.handleMessageExecutor = Executors.newFixedThreadPool(Predef$.MODULE$.augmentString(System.getProperty("spark.core.connection.handler.threads", "20")).toInt());
        this.serverChannel = ServerSocketChannel.open();
        this.connectionsByKey = new ConnectionManager$$anon$1(this);
        this.connectionsById = new ConnectionManager$$anon$2(this);
        this.messageStatuses = new HashMap<>();
        this.connectionRequests = new ConnectionManager$$anon$3(this);
        this.keyInterestChangeRequests = new SynchronizedQueue<>();
        this.sendMessageRequests = new Queue<>();
        this.futureExecContext = ExecutionContext$.MODULE$.fromExecutor(Utils$.MODULE$.newDaemonCachedThreadPool());
        this.onReceiveCallback = null;
        serverChannel().configureBlocking(false);
        serverChannel().socket().setReuseAddress(true);
        serverChannel().socket().setReceiveBufferSize(262144);
        serverChannel().socket().bind(new InetSocketAddress(i));
        serverChannel().register(selector(), 16);
        this.id = new ConnectionManagerId(Utils$.MODULE$.localHostName(), serverChannel().socket().getLocalPort());
        logInfo(new ConnectionManager$$anonfun$2(this));
        this.selectorThread = new Thread(this) { // from class: spark.network.ConnectionManager$$anon$4
            private final ConnectionManager $outer;

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                this.$outer.spark$network$ConnectionManager$$run();
            }

            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super("connection-manager-thread");
                if (this == null) {
                    throw new NullPointerException();
                }
                this.$outer = this;
            }
        };
        selectorThread().setDaemon(true);
        selectorThread().start();
    }
}
