package spark.network;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.ScalaObject;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.Queue;
import scala.reflect.ScalaSignature;

/* compiled from: Connection.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005%d!B\u0001\u0003\u0001\u00111!!E*f]\u0012LgnZ\"p]:,7\r^5p]*\u00111\u0001B\u0001\b]\u0016$xo\u001c:l\u0015\u0005)\u0011!B:qCJ\\7c\u0001\u0001\b\u0017A\u0011\u0001\"C\u0007\u0002\u0005%\u0011!B\u0001\u0002\u000b\u0007>tg.Z2uS>t\u0007C\u0001\u0007\u0010\u001b\u0005i!\"\u0001\b\u0002\u000bM\u001c\u0017\r\\1\n\u0005Ai!aC*dC2\fwJ\u00196fGRD\u0001B\u0005\u0001\u0003\u0006\u0004%\t\u0001F\u0001\bC\u0012$'/Z:t\u0007\u0001)\u0012!\u0006\t\u0003-mi\u0011a\u0006\u0006\u00031e\t1A\\3u\u0015\u0005Q\u0012\u0001\u00026bm\u0006L!\u0001H\f\u0003#%sW\r^*pG.,G/\u00113ee\u0016\u001c8\u000f\u0003\u0005\u001f\u0001\t\u0005\t\u0015!\u0003\u0016\u0003!\tG\r\u001a:fgN\u0004\u0003\"\u0003\u0011\u0001\u0005\u0003\u0005\u000b\u0011B\u0011*\u0003%\u0019X\r\\3di>\u0014x\f\u0005\u0002#O5\t1E\u0003\u0002%K\u0005A1\r[1o]\u0016d7O\u0003\u0002'3\u0005\u0019a.[8\n\u0005!\u001a#\u0001C*fY\u0016\u001cGo\u001c:\n\u0005)J\u0011\u0001C:fY\u0016\u001cGo\u001c:\t\u00131\u0002!\u0011!Q\u0001\n5\u0002\u0014!\u0003:f[>$X-\u00133`!\tAa&\u0003\u00020\u0005\t\u00192i\u001c8oK\u000e$\u0018n\u001c8NC:\fw-\u001a:JI&\u0011\u0011'C\u0001\u001ae\u0016lw\u000e^3D_:tWm\u0019;j_:l\u0015M\\1hKJLE\rC\u00034\u0001\u0011\u0005A'\u0001\u0004=S:LGO\u0010\u000b\u0005kY:\u0004\b\u0005\u0002\t\u0001!)!C\ra\u0001+!)\u0001E\ra\u0001C!)AF\ra\u0001[\u0019!!\b\u0001\u0001<\u0005\u0019yU\u000f\u001e2pqN\u0019\u0011\bP\u0006\u0011\u0005u\u0002U\"\u0001 \u000b\u0005}J\u0012\u0001\u00027b]\u001eL!!\u0011 \u0003\r=\u0013'.Z2u\u0011!\u0019\u0015H!A!\u0002\u0013!\u0015\u0001\u00024bSJ\u0004\"\u0001D#\n\u0005\u0019k!aA%oi\")1'\u000fC\u0001\u0011R\u0011\u0011j\u0013\t\u0003\u0015fj\u0011\u0001\u0001\u0005\b\u0007\u001e\u0003\n\u00111\u0001E\u0011\u001di\u0015H1A\u0005\u00029\u000b\u0001\"\\3tg\u0006<Wm]\u000b\u0002\u001fB\u0019\u0001+V,\u000e\u0003ES!AU*\u0002\u000f5,H/\u00192mK*\u0011A+D\u0001\u000bG>dG.Z2uS>t\u0017B\u0001,R\u0005\u0015\tV/Z;f!\tA\u0001,\u0003\u0002Z\u0005\t9Q*Z:tC\u001e,\u0007BB.:A\u0003%q*A\u0005nKN\u001c\u0018mZ3tA!9Q,\u000fb\u0001\n\u0003q\u0016\u0001\u00053fM\u0006,H\u000e^\"ik:\\7+\u001b>f+\u0005!\u0005B\u00021:A\u0003%A)A\teK\u001a\fW\u000f\u001c;DQVt7nU5{K\u0002BqAY\u001dA\u0002\u0013\u0005a,A\noKb$X*Z:tC\u001e,Gk\u001c\"f+N,G\rC\u0004es\u0001\u0007I\u0011A3\u0002/9,\u0007\u0010^'fgN\fw-\u001a+p\u0005\u0016,6/\u001a3`I\u0015\fHC\u00014j!\taq-\u0003\u0002i\u001b\t!QK\\5u\u0011\u001dQ7-!AA\u0002\u0011\u000b1\u0001\u001f\u00132\u0011\u0019a\u0017\b)Q\u0005\t\u0006!b.\u001a=u\u001b\u0016\u001c8/Y4f)>\u0014U-V:fI\u0002BQA\\\u001d\u0005\u0002=\f!\"\u00193e\u001b\u0016\u001c8/Y4f)\t1\u0007\u000fC\u0003r[\u0002\u0007q+A\u0004nKN\u001c\u0018mZ3\t\u000bMLD\u0011\u0001;\u0002\u0011\u001d,Go\u00115v].$\u0012!\u001e\t\u0004\u0019YD\u0018BA<\u000e\u0005\u0019y\u0005\u000f^5p]B\u0011\u0001\"_\u0005\u0003u\n\u0011A\"T3tg\u0006<Wm\u00115v].DQ\u0001`\u001d\u0005\nQ\fAbZ3u\u0007\",hn\u001b$J\r>CQA`\u001d\u0005\nQ\f!bZ3u\u0007\",hn\u001b*S\u000f%\t\t\u0001AA\u0001\u0012\u000b\t\u0019!\u0001\u0004PkR\u0014w\u000e\u001f\t\u0004\u0015\u0006\u0015a\u0001\u0003\u001e\u0001\u0003\u0003E)!a\u0002\u0014\t\u0005\u0015Ah\u0003\u0005\bg\u0005\u0015A\u0011AA\u0006)\t\t\u0019\u0001\u0003\u0006\u0002\u0010\u0005\u0015\u0011\u0013!C\u0001\u0003#\ta\"\u001b8ji\u0012\"WMZ1vYR$\u0013'\u0006\u0002\u0002\u0014)\u001aA)!\u0006,\u0005\u0005]\u0001\u0003BA\r\u0003Gi!!a\u0007\u000b\t\u0005u\u0011qD\u0001\nk:\u001c\u0007.Z2lK\u0012T1!!\t\u000e\u0003)\tgN\\8uCRLwN\\\u0005\u0005\u0003K\tYBA\tv]\u000eDWmY6fIZ\u000b'/[1oG\u0016D\u0011\"!\u000b\u0001\u0005\u0004%\t!a\u000b\u0002\r=,HOY8y+\u0005I\u0005bBA\u0018\u0001\u0001\u0006I!S\u0001\b_V$(m\u001c=!\u0011%\t\u0019\u0004\u0001b\u0001\n\u0003\t)$\u0001\bdkJ\u0014XM\u001c;Ck\u001a4WM]:\u0016\u0005\u0005]\u0002#\u0002)\u0002:\u0005u\u0012bAA\u001e#\nY\u0011I\u001d:bs\n+hMZ3s!\u0011\ty$!\u0011\u000e\u0003\u0015J1!a\u0011&\u0005)\u0011\u0015\u0010^3Ck\u001a4WM\u001d\u0005\t\u0003\u000f\u0002\u0001\u0015!\u0003\u00028\u0005y1-\u001e:sK:$()\u001e4gKJ\u001c\b\u0005C\u0004\u0002L\u0001!\t%!\u0014\u0002!\u001d,GOU3n_R,\u0017\t\u001a3sKN\u001cH#A\u000b\t\u000f\u0005E\u0003\u0001\"\u0001\u0002T\u0005!1/\u001a8e)\r1\u0017Q\u000b\u0005\u0007c\u0006=\u0003\u0019A,\t\u000f\u0005e\u0003\u0001\"\u0001\u0002\\\u000591m\u001c8oK\u000e$H#\u00014\t\u000f\u0005}\u0003\u0001\"\u0001\u0002\\\u0005ia-\u001b8jg\"\u001cuN\u001c8fGRDq!a\u0019\u0001\t\u0003\nY&A\u0003xe&$X\rC\u0004\u0002h\u0001!\t%a\u0017\u0002\tI,\u0017\r\u001a")
/* loaded from: input_file:spark/network/SendingConnection.class */
public class SendingConnection extends Connection implements ScalaObject {
    private final InetSocketAddress address;
    private final Outbox outbox;
    private final ArrayBuffer<ByteBuffer> currentBuffers;
    private volatile SendingConnection$Outbox$ Outbox$module;

    /* compiled from: Connection.scala */
    /* loaded from: input_file:spark/network/SendingConnection$Outbox.class */
    public class Outbox implements ScalaObject {
        private final int fair;
        private final Queue<Message> messages;
        private final int defaultChunkSize;
        private int nextMessageToBeUsed;
        public final SendingConnection $outer;

        public Queue<Message> messages() {
            return this.messages;
        }

        public int defaultChunkSize() {
            return this.defaultChunkSize;
        }

        public int nextMessageToBeUsed() {
            return this.nextMessageToBeUsed;
        }

        public void nextMessageToBeUsed_$eq(int i) {
            this.nextMessageToBeUsed = i;
        }

        public void addMessage(Message message) {
            Throwable messages = messages();
            synchronized (messages) {
                messages().enqueue(Predef$.MODULE$.wrapRefArray(new Message[]{message}));
                spark$network$SendingConnection$Outbox$$$outer().logDebug(new SendingConnection$Outbox$$anonfun$addMessage$1(this, message));
                messages = messages;
            }
        }

        public Option<MessageChunk> getChunk() {
            switch (this.fair) {
                case 0:
                    return getChunkFIFO();
                case 1:
                    return getChunkRR();
                default:
                    throw new Exception("Unexpected fairness policy in outbox");
            }
        }

        private Option<MessageChunk> getChunkFIFO() {
            Queue<Message> messages = messages();
            synchronized (messages) {
                while (!messages().isEmpty()) {
                    Message message = (Message) messages().apply(0);
                    Option<MessageChunk> chunkForSending = message.getChunkForSending(defaultChunkSize());
                    if (chunkForSending.isDefined()) {
                        messages().$plus$eq(message);
                        if (!message.started()) {
                            spark$network$SendingConnection$Outbox$$$outer().logDebug(new SendingConnection$Outbox$$anonfun$getChunkFIFO$1(this, message));
                            message.started_$eq(true);
                            message.startTime_$eq(System.currentTimeMillis());
                        }
                        return chunkForSending;
                    }
                    message.finishTime_$eq(System.currentTimeMillis());
                    messages = spark$network$SendingConnection$Outbox$$$outer();
                    messages.logDebug(new SendingConnection$Outbox$$anonfun$getChunkFIFO$2(this, message));
                }
                return None$.MODULE$;
            }
        }

        private Option<MessageChunk> getChunkRR() {
            Queue<Message> messages = messages();
            synchronized (messages) {
                while (!messages().isEmpty()) {
                    Message message = (Message) messages().dequeue();
                    Option<MessageChunk> chunkForSending = message.getChunkForSending(defaultChunkSize());
                    if (chunkForSending.isDefined()) {
                        messages().enqueue(Predef$.MODULE$.wrapRefArray(new Message[]{message}));
                        nextMessageToBeUsed_$eq(nextMessageToBeUsed() + 1);
                        if (!message.started()) {
                            spark$network$SendingConnection$Outbox$$$outer().logDebug(new SendingConnection$Outbox$$anonfun$getChunkRR$1(this, message));
                            message.started_$eq(true);
                            message.startTime_$eq(System.currentTimeMillis());
                        }
                        spark$network$SendingConnection$Outbox$$$outer().logTrace(new SendingConnection$Outbox$$anonfun$getChunkRR$2(this, message));
                        return chunkForSending;
                    }
                    message.finishTime_$eq(System.currentTimeMillis());
                    messages = spark$network$SendingConnection$Outbox$$$outer();
                    messages.logDebug(new SendingConnection$Outbox$$anonfun$getChunkRR$3(this, message));
                }
                return None$.MODULE$;
            }
        }

        public SendingConnection spark$network$SendingConnection$Outbox$$$outer() {
            return this.$outer;
        }

        public Outbox(SendingConnection sendingConnection, int i) {
            this.fair = i;
            if (sendingConnection == null) {
                throw new NullPointerException();
            }
            this.$outer = sendingConnection;
            this.messages = new Queue<>();
            this.defaultChunkSize = 65536;
            this.nextMessageToBeUsed = 0;
        }
    }

    public InetSocketAddress address() {
        return this.address;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v4 */
    /* JADX WARN: Type inference failed for: r0v5, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8 */
    /* JADX WARN: Type inference failed for: r1v2, types: [spark.network.SendingConnection$Outbox$] */
    public final SendingConnection$Outbox$ Outbox() {
        if (this.Outbox$module == null) {
            ?? r0 = this;
            synchronized (r0) {
                if (this.Outbox$module == null) {
                    this.Outbox$module = new ScalaObject(this) { // from class: spark.network.SendingConnection$Outbox$
                        public int init$default$1() {
                            return 0;
                        }
                    };
                }
                r0 = this;
            }
        }
        return this.Outbox$module;
    }

    public Outbox outbox() {
        return this.outbox;
    }

    public ArrayBuffer<ByteBuffer> currentBuffers() {
        return this.currentBuffers;
    }

    @Override // spark.network.Connection
    public InetSocketAddress getRemoteAddress() {
        return address();
    }

    public void send(Message message) {
        ScalaObject outbox = outbox();
        synchronized (outbox) {
            outbox().addMessage(message);
            if (channel().isConnected()) {
                changeConnectionKeyInterest(5);
            }
            outbox = outbox;
        }
    }

    public void connect() {
        try {
            channel().connect(address());
            channel().register(selector(), 8);
            logInfo(new SendingConnection$$anonfun$connect$1(this));
        } catch (Exception e) {
            logError(new SendingConnection$$anonfun$connect$2(this), e);
            callOnExceptionCallback(e);
        }
    }

    public void finishConnect() {
        try {
            channel().finishConnect();
            changeConnectionKeyInterest(5);
            logInfo(new SendingConnection$$anonfun$finishConnect$1(this));
        } catch (Exception e) {
            logWarning(new SendingConnection$$anonfun$finishConnect$2(this), e);
            callOnExceptionCallback(e);
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:25:0x0054, code lost:
    
        if (r0.equals(r0) != false) goto L18;
     */
    @Override // spark.network.Connection
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void write() {
        /*
            r5 = this;
        L0:
            r0 = r5
            scala.collection.mutable.ArrayBuffer r0 = r0.currentBuffers()     // Catch: java.lang.Exception -> Lb9
            int r0 = r0.size()     // Catch: java.lang.Exception -> Lb9
            r1 = 0
            if (r0 != r1) goto L69
            r0 = r5
            spark.network.SendingConnection$Outbox r0 = r0.outbox()     // Catch: java.lang.Exception -> Lb9
            r1 = r0
            r11 = r1
            monitor-enter(r0)     // Catch: java.lang.Exception -> Lb9
            r0 = r5
            spark.network.SendingConnection$Outbox r0 = r0.outbox()     // Catch: java.lang.Throwable -> Lb5 java.lang.Exception -> Lb9
            scala.Option r0 = r0.getChunk()     // Catch: java.lang.Throwable -> Lb5 java.lang.Exception -> Lb9
            r7 = r0
            r0 = r7
            boolean r0 = r0 instanceof scala.Some     // Catch: java.lang.Throwable -> Lb5 java.lang.Exception -> Lb9
            if (r0 == 0) goto L3f
            r0 = r5
            scala.collection.mutable.ArrayBuffer r0 = r0.currentBuffers()     // Catch: java.lang.Throwable -> Lb5 java.lang.Exception -> Lb9
            r1 = r7
            scala.Some r1 = (scala.Some) r1     // Catch: java.lang.Throwable -> Lb5 java.lang.Exception -> Lb9
            java.lang.Object r1 = r1.x()     // Catch: java.lang.Throwable -> Lb5 java.lang.Exception -> Lb9
            spark.network.MessageChunk r1 = (spark.network.MessageChunk) r1     // Catch: java.lang.Throwable -> Lb5 java.lang.Exception -> Lb9
            scala.collection.mutable.ArrayBuffer r1 = r1.buffers()     // Catch: java.lang.Throwable -> Lb5 java.lang.Exception -> Lb9
            scala.collection.mutable.ArrayBuffer r0 = r0.$plus$plus$eq(r1)     // Catch: java.lang.Throwable -> Lb5 java.lang.Exception -> Lb9
            r0 = r11
            monitor-exit(r0)     // Catch: java.lang.Throwable -> Lb5 java.lang.Exception -> Lb9
            goto L69
        L3f:
            scala.None$ r0 = scala.None$.MODULE$     // Catch: java.lang.Throwable -> Lb5 java.lang.Exception -> Lb9
            r1 = r7
            r6 = r1
            r1 = r0
            if (r1 != 0) goto L50
        L49:
            r0 = r6
            if (r0 == 0) goto L57
            goto L60
        L50:
            r1 = r6
            boolean r0 = r0.equals(r1)     // Catch: java.lang.Throwable -> Lb5 java.lang.Exception -> Lb9
            if (r0 == 0) goto L60
        L57:
            r0 = r5
            r1 = 1
            r0.changeConnectionKeyInterest(r1)     // Catch: java.lang.Throwable -> Lb5 java.lang.Exception -> Lb9
            r0 = r11
            monitor-exit(r0)     // Catch: java.lang.Throwable -> Lb5 java.lang.Exception -> Lb9
            return
        L60:
            scala.MatchError r0 = new scala.MatchError     // Catch: java.lang.Throwable -> Lb5 java.lang.Exception -> Lb9
            r1 = r0
            r2 = r7
            r1.<init>(r2)     // Catch: java.lang.Throwable -> Lb5 java.lang.Exception -> Lb9
            throw r0     // Catch: java.lang.Throwable -> Lb5 java.lang.Exception -> Lb9
        L69:
            r0 = r5
            scala.collection.mutable.ArrayBuffer r0 = r0.currentBuffers()     // Catch: java.lang.Exception -> Lb9
            int r0 = r0.size()     // Catch: java.lang.Exception -> Lb9
            r1 = 0
            if (r0 <= r1) goto L0
            r0 = r5
            scala.collection.mutable.ArrayBuffer r0 = r0.currentBuffers()     // Catch: java.lang.Exception -> Lb9
            r1 = 0
            java.lang.Object r0 = r0.apply(r1)     // Catch: java.lang.Exception -> Lb9
            java.nio.ByteBuffer r0 = (java.nio.ByteBuffer) r0     // Catch: java.lang.Exception -> Lb9
            r8 = r0
            r0 = r8
            int r0 = r0.remaining()     // Catch: java.lang.Exception -> Lb9
            r10 = r0
            r0 = r5
            java.nio.channels.SocketChannel r0 = r0.channel()     // Catch: java.lang.Exception -> Lb9
            r1 = r8
            int r0 = r0.write(r1)     // Catch: java.lang.Exception -> Lb9
            r9 = r0
            r0 = r8
            int r0 = r0.remaining()     // Catch: java.lang.Exception -> Lb9
            r1 = 0
            if (r0 != r1) goto La9
            r0 = r5
            scala.collection.mutable.ArrayBuffer r0 = r0.currentBuffers()     // Catch: java.lang.Exception -> Lb9
            r1 = r8
            scala.collection.mutable.Buffer r0 = r0.$minus$eq(r1)     // Catch: java.lang.Exception -> Lb9
            goto Lac
        La9:
            scala.runtime.BoxedUnit r0 = scala.runtime.BoxedUnit.UNIT     // Catch: java.lang.Exception -> Lb9
        Lac:
            r0 = r9
            r1 = r10
            if (r0 >= r1) goto L0
            return
        Lb5:
            r1 = move-exception
            monitor-exit(r1)     // Catch: java.lang.Exception -> Lb9
            throw r0     // Catch: java.lang.Exception -> Lb9
        Lb9:
            r12 = move-exception
            r0 = r5
            spark.network.SendingConnection$$anonfun$write$1 r1 = new spark.network.SendingConnection$$anonfun$write$1
            r2 = r1
            r3 = r5
            r2.<init>(r3)
            r2 = r12
            r0.logWarning(r1, r2)
            r0 = r5
            r1 = r12
            r0.callOnExceptionCallback(r1)
            r0 = r5
            r0.close()
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: spark.network.SendingConnection.write():void");
    }

    @Override // spark.network.Connection
    public void read() {
        try {
            int read = channel().read(ByteBuffer.allocate(1));
            if (read == -1) {
                close();
            } else if (read > 0) {
                logWarning(new SendingConnection$$anonfun$read$1(this));
            }
        } catch (Exception e) {
            logError(new SendingConnection$$anonfun$read$2(this), e);
            callOnExceptionCallback(e);
            close();
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public SendingConnection(InetSocketAddress inetSocketAddress, Selector selector, ConnectionManagerId connectionManagerId) {
        super(SocketChannel.open(), selector, connectionManagerId);
        this.address = inetSocketAddress;
        this.outbox = new Outbox(this, 1);
        this.currentBuffers = new ArrayBuffer<>();
    }
}
