/*
 * Decompiled with CFR 0.152.
 */
package org.rapidoid.net.impl;

import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import javax.net.ssl.SSLContext;
import org.rapidoid.buffer.Buf;
import org.rapidoid.buffer.BufGroup;
import org.rapidoid.buffer.BufUtil;
import org.rapidoid.buffer.IncompleteReadException;
import org.rapidoid.collection.Coll;
import org.rapidoid.config.Conf;
import org.rapidoid.config.ConfigUtil;
import org.rapidoid.ctx.Ctxs;
import org.rapidoid.expire.ExpirationCrawlerThread;
import org.rapidoid.expire.Expire;
import org.rapidoid.insight.Insights;
import org.rapidoid.insight.StatsMeasure;
import org.rapidoid.log.Log;
import org.rapidoid.net.NetworkingParams;
import org.rapidoid.net.Protocol;
import org.rapidoid.net.impl.AbstractEventLoop;
import org.rapidoid.net.impl.ChannelHolderImpl;
import org.rapidoid.net.impl.ConnState;
import org.rapidoid.net.impl.ConnectionTarget;
import org.rapidoid.net.impl.NetWorker;
import org.rapidoid.net.impl.ProtocolException;
import org.rapidoid.net.impl.RapidoidChannel;
import org.rapidoid.net.impl.RapidoidConnection;
import org.rapidoid.net.impl.RapidoidHelper;
import org.rapidoid.pool.Pool;
import org.rapidoid.pool.Pools;
import org.rapidoid.u.U;
import org.rapidoid.util.SimpleList;

public class ExtendedWorker
extends AbstractEventLoop<ExtendedWorker>
implements NetWorker {
    public static int MAX_IO_WORKERS = 1024;
    public static boolean EXTRA_SAFE = false;
    private static final ExpirationCrawlerThread idleConnectionsCrawler;
    private static final int connTimeout;
    private final Queue<RapidoidChannel> connected;
    private final SimpleList<RapidoidConnection> done;
    private final Queue<RapidoidConnection> restarting;
    private final Queue<ConnectionTarget> connecting;
    private final Pool<RapidoidConnection> connections;
    private final Set<RapidoidConnection> allConnections = Coll.concurrentSet();
    private final long maxPipeline;
    private final int selectorTimeout = 10;
    final Protocol serverProtocol;
    final RapidoidHelper helper;
    private final int bufSize;
    private final long bufSizeLimit;
    private final boolean noDelay;
    private final BufGroup bufs;
    private volatile long messagesProcessed;
    private final SSLContext sslContext;
    private final StatsMeasure dataIn;
    private final StatsMeasure dataOut;

    public ExtendedWorker(String name, RapidoidHelper helper, NetworkingParams net, SSLContext sslContext) {
        super(name);
        this.bufSize = net.bufSizeKB() * 1024;
        this.noDelay = net.noDelay();
        this.bufs = new BufGroup(this.bufSize, net.syncBufs());
        this.bufSizeLimit = 1024L * (long)((Integer)Conf.NET.entry("bufSizeLimit").or((Object)1024)).intValue();
        this.serverProtocol = net.protocol();
        this.helper = helper;
        this.sslContext = sslContext;
        this.maxPipeline = net.maxPipeline();
        int queueSize = ConfigUtil.micro() ? 1000 : 1000000;
        int growFactor = ConfigUtil.micro() ? 2 : 10;
        this.restarting = new ArrayBlockingQueue<RapidoidConnection>(queueSize);
        this.connecting = new ArrayBlockingQueue<ConnectionTarget>(queueSize);
        this.connected = new ArrayBlockingQueue<RapidoidChannel>(queueSize);
        this.done = new SimpleList(queueSize / 10, growFactor);
        this.dataIn = Insights.stats((String)(name + ":datain"));
        this.dataOut = Insights.stats((String)(name + ":dataout"));
        this.connections = Pools.create((String)"connections", (Callable)new Callable<RapidoidConnection>(){

            @Override
            public RapidoidConnection call() throws Exception {
                return ExtendedWorker.this.newConnection(false);
            }
        }, (int)100000);
        if (idleConnectionsCrawler != null) {
            idleConnectionsCrawler.register(this.allConnections);
        }
    }

    @Override
    public void accept(SocketChannel socketChannel) throws IOException {
        this.configureSocket(socketChannel);
        this.connected.add(new RapidoidChannel(socketChannel, false, this.serverProtocol));
        this.selector.wakeup();
    }

    public void connect(ConnectionTarget target) throws IOException {
        this.configureSocket(target.socketChannel);
        this.connecting.add(target);
        if (target.socketChannel.connect(target.addr)) {
            Log.info((String)"Opened socket, connected", (String)"address", (Object)target.addr);
        } else {
            Log.info((String)"Opened socket, connecting...", (String)"address", (Object)target.addr);
        }
        this.selector.wakeup();
    }

    private void configureSocket(SocketChannel socketChannel) throws IOException, SocketException {
        socketChannel.configureBlocking(false);
        Socket socket = socketChannel.socket();
        socket.setTcpNoDelay(this.noDelay);
        socket.setReceiveBufferSize(this.bufSize);
        socket.setSendBufferSize(this.bufSize);
        socket.setReuseAddress(true);
    }

    @Override
    protected void connectOP(SelectionKey key) throws IOException {
        U.must((boolean)key.isConnectable());
        SocketChannel socketChannel = (SocketChannel)key.channel();
        if (!socketChannel.isConnectionPending()) {
            return;
        }
        ConnectionTarget target = (ConnectionTarget)((Object)key.attachment());
        try {
            boolean ready = socketChannel.finishConnect();
            U.must((boolean)ready, (String)"Expected an established connection!");
            Log.info((String)"Connected", (String)"address", (Object)target.addr);
            this.connected.add(new RapidoidChannel(socketChannel, true, target.protocol, target.holder, target.reconnecting, target.state));
        }
        catch (ConnectException e) {
            this.retryConnecting(target);
        }
    }

    private void retryConnecting(ConnectionTarget target) throws IOException {
        Log.warn((String)"Reconnecting...", (String)"address", (Object)target.addr);
        target.socketChannel = SocketChannel.open();
        target.retryAfter = U.time() + 1000L;
        this.connect(target);
    }

    @Override
    protected void readOP(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel)key.channel();
        RapidoidConnection conn = (RapidoidConnection)key.attachment();
        this.readInto(socketChannel, conn);
        this.process(conn);
        if (conn.closing) {
            if (conn.autoReconnect()) {
                this.reconnect(conn);
            } else {
                this.close(key);
            }
        }
    }

    private void readInto(SocketChannel socketChannel, RapidoidConnection conn) {
        int read;
        try {
            read = conn.hasTLS ? (conn.tls.netIn.hasRemaining() ? socketChannel.read(conn.tls.netIn) : 0) : ((long)conn.input.size() < this.bufSizeLimit ? conn.input.append((ReadableByteChannel)socketChannel) : 0);
        }
        catch (Exception e) {
            Log.debug((String)"Connection error", (Throwable)e);
            read = -1;
        }
        if (read == -1) {
            Log.debug((String)"The connection was closed!");
            conn.closing = true;
            if (conn.hasTLS) {
                conn.tls.closeInbound();
            }
        } else {
            boolean success;
            if (conn.hasTLS && read > 0 && (success = conn.tls.unwrapInput())) {
                this.wantToWrite(conn);
            }
            this.dataIn.value((long)read);
        }
    }

    private void reconnect(RapidoidConnection conn) throws IOException {
        SelectionKey key = conn.key;
        InetSocketAddress addr = conn.getAddress();
        Protocol protocol = conn.getProtocol();
        ChannelHolderImpl holder = conn.getHolder();
        ConnState state = conn.state().copy();
        holder.closed();
        this.close(key);
        this.retryConnecting(new ConnectionTarget(null, addr, protocol, holder, true, state));
    }

    @Override
    public void process(RapidoidConnection conn) {
        this.messagesProcessed += this.processMsgs(conn);
        conn.completedInputPos = conn.input.position();
    }

    private long processMsgs(RapidoidConnection conn) {
        long reqN;
        for (reqN = 0L; (reqN < this.maxPipeline || this.maxPipeline <= 0L) && conn.input().hasRemaining() && this.processNext(conn, false, false); ++reqN) {
        }
        this.touch(conn);
        return reqN;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean processNext(RapidoidConnection conn, boolean initial, boolean write) {
        long seq;
        U.must((initial || write || conn.input().hasRemaining() ? 1 : 0) != 0);
        if (initial) {
            seq = 0L;
            conn.requestId = -1L;
        } else {
            seq = conn.readSeq.incrementAndGet();
            conn.requestId = this.helper.requestIdGen;
            this.helper.requestIdGen += (long)MAX_IO_WORKERS;
            ++this.helper.requestCounter;
        }
        conn.input().checkpoint(conn.input().position());
        int limit = conn.input().limit();
        int osize = conn.output().size();
        BufUtil.doneWriting((Buf)conn.input());
        ConnState state = conn.state();
        long stateN = state.n;
        Object stateObj = state.obj;
        try {
            conn.done = false;
            conn.async = false;
            if (EXTRA_SAFE) {
                this.processNextExtraSafe(conn);
            } else {
                Protocol protocol = conn.getProtocol();
                if (protocol == null) {
                    boolean bl = false;
                    return bl;
                }
                protocol.process(conn);
            }
            BufUtil.startWriting((Buf)conn.input());
            if (!conn.isAsync()) {
                if (!conn.closed) {
                    conn.done();
                }
                conn.processedSeq(seq);
            }
            conn.input().deleteBefore(conn.input().checkpoint());
            boolean protocol = true;
            return protocol;
        }
        catch (IncompleteReadException e) {
            conn.log("<< ROLLBACK >>");
            conn.input().position(conn.input().checkpoint());
            conn.input().limit(limit);
            BufUtil.startWriting((Buf)conn.input());
            state.n = stateN;
            state.obj = stateObj;
            boolean decreased = conn.readSeq.compareAndSet(seq, seq - 1L);
            U.must((boolean)decreased, (String)"Error in the request order control! Handle: %s", (long)seq);
        }
        catch (ProtocolException e) {
            conn.log("<< PROTOCOL ERROR >>");
            Log.warn((String)"Protocol error", (String)"error", (Object)e);
            conn.output().deleteAfter(osize);
            conn.write((String)U.or((Object)e.getMessage(), (Object)"Protocol error!"));
            conn.error();
            conn.processedSeq(seq);
            conn.close(true);
        }
        catch (Throwable e) {
            conn.log("<< ERROR >>");
            Log.error((String)"Failed to process message!", (Throwable)e);
            conn.processedSeq(seq);
            conn.close(true);
        }
        finally {
            conn.input().checkpoint(-1);
        }
        return false;
    }

    private void processNextExtraSafe(RapidoidConnection conn) {
        if (Ctxs.hasContext()) {
            Log.warn((String)"Detected unclosed context before processing message!");
            Ctxs.close();
        }
        try {
            conn.getProtocol().process(conn);
        }
        finally {
            if (Ctxs.hasContext()) {
                Log.warn((String)"Detected unclosed context after processing message!");
                Ctxs.close();
            }
        }
    }

    @Override
    public void close(RapidoidConnection conn) {
        this.close(conn.key);
    }

    private void close(SelectionKey key) {
        try {
            if (key != null) {
                RapidoidConnection conn;
                Object attachment = key.attachment();
                this.clearKey(key);
                if (attachment instanceof RapidoidConnection && (conn = (RapidoidConnection)attachment) != null && !conn.closed) {
                    Log.trace((String)"Closing connection", (String)"connection", (Object)conn);
                    assert (conn.key == key);
                    conn.reset();
                    this.connections.release((Object)conn);
                }
            }
        }
        catch (IOException e) {
            Log.warn((String)"Error while closing connection!", (Throwable)e);
        }
    }

    private void clearKey(SelectionKey key) throws IOException {
        if (key.isValid()) {
            SocketChannel socketChannel = (SocketChannel)key.channel();
            socketChannel.close();
            key.attach(null);
            key.cancel();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void writeOP(SelectionKey key) throws IOException {
        RapidoidConnection conn = (RapidoidConnection)key.attachment();
        SocketChannel socketChannel = (SocketChannel)key.channel();
        this.checkOnSameThread();
        this.touch(conn);
        try {
            RapidoidConnection rapidoidConnection = conn;
            synchronized (rapidoidConnection) {
                Buf buf = conn.outgoing;
                synchronized (buf) {
                    if (conn.hasTLS) {
                        Buf buf2 = conn.output;
                        synchronized (buf2) {
                            conn.tls.wrapToOutgoing();
                        }
                    }
                    this.writeOp(key, conn, socketChannel);
                }
            }
        }
        catch (IOException e) {
            this.close(conn);
        }
        catch (CancelledKeyException cke) {
            Log.debug((String)"Tried to write on canceled selector key!");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writeOp(SelectionKey key, RapidoidConnection conn, SocketChannel socketChannel) throws IOException {
        boolean closeAfterWrite;
        boolean finishedWriting;
        Buf buf = conn.outgoing;
        synchronized (buf) {
            if (conn.outgoing.hasRemaining()) {
                conn.log("WRITING");
                BufUtil.startWriting((Buf)conn.outgoing);
                int wrote = conn.outgoing.writeTo((WritableByteChannel)socketChannel);
                conn.outgoing.deleteBefore(wrote);
                BufUtil.doneWriting((Buf)conn.outgoing);
                conn.log("DONE WRITING");
            }
        }
        RapidoidConnection rapidoidConnection = conn;
        synchronized (rapidoidConnection) {
            finishedWriting = conn.finishedWriting();
            closeAfterWrite = conn.closeAfterWrite();
        }
        if (finishedWriting && closeAfterWrite) {
            this.close(conn);
        } else {
            if (finishedWriting) {
                key.interestOps(conn.mode != 0 ? conn.mode : conn.nextOp);
                this.processNext(conn, false, true);
            } else {
                key.interestOps(conn.mode != 0 ? conn.mode : 5);
            }
            conn.wrote(finishedWriting);
        }
    }

    @Override
    public void wantToWrite(RapidoidConnection conn) {
        U.must((conn.mode != 1 ? 1 : 0) != 0);
        this.touch(conn);
        if (this.onSameThread()) {
            conn.key.interestOps(4);
        } else {
            this.wantToWriteAsync(conn);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void wantToWriteAsync(RapidoidConnection conn) {
        this.touch(conn);
        SimpleList<RapidoidConnection> simpleList = this.done;
        synchronized (simpleList) {
            this.done.add((Object)conn);
        }
        this.selector.wakeup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void doProcessing() {
        RapidoidConnection restartedConn;
        RapidoidChannel channel;
        SelectionKey newKey;
        long now = U.time();
        int connectingN = this.connecting.size();
        for (int i = 0; i < connectingN; ++i) {
            ConnectionTarget target = this.connecting.poll();
            assert (target != null);
            if (target.retryAfter < now) {
                Log.debug((String)"connecting", (String)"address", (Object)target.addr);
                try {
                    newKey = target.socketChannel.register(this.selector, 8);
                    newKey.attach((Object)target);
                }
                catch (ClosedChannelException e) {
                    Log.warn((String)"Closed channel", (Throwable)e);
                }
                continue;
            }
            this.connecting.add(target);
        }
        while ((channel = this.connected.poll()) != null) {
            SocketChannel socketChannel = channel.socketChannel;
            Log.debug((String)"connected", (String)"address", (Object)socketChannel.socket().getRemoteSocketAddress());
            try {
                newKey = socketChannel.register(this.selector, 1);
                U.notNull((Object)channel.protocol, (String)"protocol", (Object[])new Object[0]);
                RapidoidConnection conn = this.attachConn(newKey, channel.protocol);
                conn.setClient(channel.isClient);
                conn.autoReconnect(channel.autoreconnecting);
                this.bindChannelToHolder(conn, channel.holder);
                if (channel.state != null) {
                    conn.state().copyFrom(channel.state);
                }
                try {
                    this.processNext(conn, true, false);
                }
                finally {
                    conn.setInitial(false);
                }
            }
            catch (ClosedChannelException e) {
                Log.warn((String)"Closed channel", (Throwable)e);
            }
        }
        while ((restartedConn = this.restarting.poll()) != null) {
            Log.debug((String)"restarting", (String)"connection", (Object)restartedConn);
            this.processNext(restartedConn, true, false);
        }
        SimpleList<RapidoidConnection> simpleList = this.done;
        synchronized (simpleList) {
            for (int i = 0; i < this.done.size(); ++i) {
                RapidoidConnection conn = (RapidoidConnection)this.done.get(i);
                if (conn.key == null || !conn.key.isValid()) continue;
                conn.key.interestOps(4);
            }
            this.done.clear();
        }
    }

    private void bindChannelToHolder(RapidoidConnection conn, ChannelHolderImpl holder) {
        conn.setHolder(holder);
        if (holder != null) {
            holder.setChannel(conn);
        }
    }

    private RapidoidConnection attachConn(SelectionKey key, Protocol protocol) {
        U.notNull((Object)key, (String)"protocol", (Object[])new Object[0]);
        U.notNull((Object)protocol, (String)"protocol", (Object[])new Object[0]);
        Object attachment = key.attachment();
        assert (attachment == null || attachment instanceof ConnectionTarget);
        RapidoidConnection conn = (RapidoidConnection)this.connections.get();
        conn.reset();
        U.must((boolean)conn.closed);
        conn.closed = false;
        conn.key = key;
        conn.setProtocol(protocol);
        key.attach(conn);
        this.touch(conn);
        return conn;
    }

    private void touch(RapidoidConnection conn) {
        conn.setExpiresAt(this.approxTime + (long)connTimeout);
    }

    @Override
    protected void failedOP(SelectionKey key, Throwable e) {
        Log.error((String)"Network error", (Throwable)e);
        this.close(key);
    }

    @Override
    public RapidoidConnection newConnection(boolean client) {
        RapidoidConnection conn = new RapidoidConnection(this, this.bufs);
        this.allConnections.add(conn);
        return conn;
    }

    @Override
    public long getMessagesProcessed() {
        return this.messagesProcessed;
    }

    @Override
    protected synchronized void stopLoop() {
        super.stopLoop();
        this.done.clear();
        this.connected.clear();
        this.connections.clear();
        this.bufs.clear();
    }

    @Override
    public synchronized ExtendedWorker shutdown() {
        this.stopLoop();
        this.waitToStop();
        return this;
    }

    @Override
    public SSLContext sslContext() {
        return this.sslContext;
    }

    public void restart(RapidoidConnection conn) {
        this.restarting.add(conn);
    }

    @Override
    protected long getSelectorTimeout() {
        return 10L;
    }

    @Override
    public RapidoidHelper helper() {
        return this.helper;
    }

    static {
        int timeoutResolution = (Integer)Conf.HTTP.entry("timeoutResolution").or((Object)5000);
        connTimeout = (Integer)Conf.HTTP.entry("timeout").or((Object)30000);
        idleConnectionsCrawler = timeoutResolution > 0 && connTimeout > 0 ? Expire.crawler((String)"idleConnections", (int)timeoutResolution) : null;
    }
}

