package org.apache.qpid.server.transport;

import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.server.configuration.CommonProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/qpid/server/transport/SelectorThread.class */
public class SelectorThread extends Thread {
    private static final Logger LOGGER = LoggerFactory.getLogger(SelectorThread.class);
    private static final long ACCEPT_CANCELLATION_TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME, 60000).intValue();
    static final String IO_THREAD_NAME_PREFIX = "IO-";
    private final NetworkConnectionScheduler _scheduler;
    private SelectionTask[] _selectionTasks;
    private final Queue<Runnable> _tasks = new ConcurrentLinkedQueue();
    private final AtomicBoolean _closed = new AtomicBoolean();
    private final BlockingQueue<Runnable> _workQueue = new LinkedBlockingQueue();
    private final AtomicInteger _nextSelectorTaskIndex = new AtomicInteger();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/qpid/server/transport/SelectorThread$ConnectionProcessor.class */
    public static final class ConnectionProcessor implements Runnable {
        private final NetworkConnectionScheduler _scheduler;
        private final NonBlockingConnection _connection;
        private AtomicBoolean _running = new AtomicBoolean();

        public ConnectionProcessor(NetworkConnectionScheduler networkConnectionScheduler, NonBlockingConnection nonBlockingConnection) {
            this._scheduler = networkConnectionScheduler;
            this._connection = nonBlockingConnection;
        }

        @Override // java.lang.Runnable
        public void run() {
            this._scheduler.incrementRunningCount();
            try {
                processConnection();
            } finally {
                this._scheduler.decrementRunningCount();
            }
        }

        public void processConnection() {
            if (this._running.compareAndSet(false, true)) {
                this._scheduler.processConnection(this._connection);
            }
        }
    }

    /* loaded from: input_file:org/apache/qpid/server/transport/SelectorThread$SelectionTask.class */
    public final class SelectionTask implements Runnable {
        private final Selector _selector;
        private final AtomicBoolean _selecting;
        private final AtomicBoolean _inSelect;
        private final AtomicInteger _wakeups;
        private long _nextTimeout;
        private final Queue<NonBlockingConnection> _unregisteredConnections;
        private final Set<NonBlockingConnection> _unscheduledConnections;

        private SelectionTask() throws IOException {
            this._selecting = new AtomicBoolean();
            this._inSelect = new AtomicBoolean();
            this._wakeups = new AtomicInteger();
            this._unregisteredConnections = new ConcurrentLinkedQueue();
            this._unscheduledConnections = new HashSet();
            this._selector = Selector.open();
        }

        @Override // java.lang.Runnable
        public void run() {
            performSelect();
        }

        public boolean acquireSelecting() {
            return this._selecting.compareAndSet(false, true);
        }

        public void clearSelecting() {
            this._selecting.set(false);
        }

        public Selector getSelector() {
            return this._selector;
        }

        public Queue<NonBlockingConnection> getUnregisteredConnections() {
            return this._unregisteredConnections;
        }

        public Set<NonBlockingConnection> getUnscheduledConnections() {
            return this._unscheduledConnections;
        }

        private List<NonBlockingConnection> processUnscheduledConnections() {
            this._nextTimeout = 2147483647L;
            if (getUnscheduledConnections().isEmpty()) {
                return Collections.emptyList();
            }
            ArrayList arrayList = new ArrayList();
            long currentTimeMillis = System.currentTimeMillis();
            Iterator<NonBlockingConnection> it = getUnscheduledConnections().iterator();
            while (it.hasNext()) {
                NonBlockingConnection next = it.next();
                AggregateTicker ticker = next.getTicker();
                int timeToNextTick = ticker.getTimeToNextTick(currentTimeMillis);
                ticker.resetModified();
                if (timeToNextTick <= 0 || next.isStateChanged()) {
                    arrayList.add(next);
                    try {
                        next.getSocketChannel().register(this._selector, 0, next);
                    } catch (CancelledKeyException | ClosedChannelException e) {
                        SelectorThread.LOGGER.debug("Failed to register with selector for connection " + next + ". Connection is probably being closed by peer.", e);
                    }
                    it.remove();
                } else {
                    this._nextTimeout = Math.min(timeToNextTick, this._nextTimeout);
                }
            }
            return arrayList.isEmpty() ? Collections.emptyList() : arrayList;
        }

        private List<NonBlockingConnection> processSelectionKeys() {
            Set<SelectionKey> selectedKeys = this._selector.selectedKeys();
            if (selectedKeys.isEmpty()) {
                return Collections.emptyList();
            }
            ArrayList arrayList = new ArrayList();
            for (SelectionKey selectionKey : selectedKeys) {
                if (selectionKey.isAcceptable()) {
                    final NonBlockingNetworkTransport nonBlockingNetworkTransport = (NonBlockingNetworkTransport) selectionKey.attachment();
                    final ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
                    final SocketAddress localSocketAddress = serverSocketChannel.socket().getLocalSocketAddress();
                    try {
                        serverSocketChannel.register(this._selector, 0, nonBlockingNetworkTransport);
                    } catch (ClosedChannelException e) {
                        SelectorThread.LOGGER.error("Failed to register selector on accepting port {} ", localSocketAddress, e);
                    }
                    SelectorThread.this._workQueue.add(new Runnable() { // from class: org.apache.qpid.server.transport.SelectorThread.SelectionTask.1
                        @Override // java.lang.Runnable
                        public void run() {
                            try {
                                try {
                                    SelectorThread.this._scheduler.incrementRunningCount();
                                    nonBlockingNetworkTransport.acceptSocketChannel(serverSocketChannel);
                                    serverSocketChannel.register(SelectionTask.this._selector, 16, nonBlockingNetworkTransport);
                                    SelectionTask.this.wakeup();
                                } catch (ClosedSelectorException e2) {
                                    SelectorThread.LOGGER.info("Failed to register selector on accepting port {} because selector is already closed. This is probably a harmless race-condition (QPID-7399)", localSocketAddress);
                                } catch (ClosedChannelException e3) {
                                    SelectorThread.LOGGER.error("Failed to register selector on accepting port {}", localSocketAddress, e3);
                                } finally {
                                    SelectorThread.this._scheduler.decrementRunningCount();
                                }
                            } catch (Throwable th) {
                                try {
                                    try {
                                        serverSocketChannel.register(SelectionTask.this._selector, 16, nonBlockingNetworkTransport);
                                        SelectionTask.this.wakeup();
                                    } catch (Throwable th2) {
                                        throw th2;
                                    }
                                } catch (ClosedChannelException e4) {
                                    SelectorThread.LOGGER.error("Failed to register selector on accepting port {}", localSocketAddress, e4);
                                    SelectorThread.this._scheduler.decrementRunningCount();
                                } catch (ClosedSelectorException e5) {
                                    SelectorThread.LOGGER.info("Failed to register selector on accepting port {} because selector is already closed. This is probably a harmless race-condition (QPID-7399)", localSocketAddress);
                                    SelectorThread.this._scheduler.decrementRunningCount();
                                }
                                throw th;
                            }
                        }
                    });
                } else {
                    NonBlockingConnection nonBlockingConnection = (NonBlockingConnection) selectionKey.attachment();
                    if (nonBlockingConnection != null) {
                        try {
                            selectionKey.channel().register(this._selector, 0, nonBlockingConnection);
                        } catch (ClosedChannelException e2) {
                        }
                        arrayList.add(nonBlockingConnection);
                        getUnscheduledConnections().remove(nonBlockingConnection);
                    }
                }
            }
            selectedKeys.clear();
            return arrayList;
        }

        private List<NonBlockingConnection> reregisterUnregisteredConnections() {
            if (getUnregisteredConnections().isEmpty()) {
                return Collections.emptyList();
            }
            ArrayList arrayList = new ArrayList();
            while (true) {
                NonBlockingConnection poll = getUnregisteredConnections().poll();
                if (poll == null) {
                    break;
                }
                getUnscheduledConnections().add(poll);
                try {
                    poll.getSocketChannel().register(this._selector, (poll.wantsRead() ? 1 : 0) | (poll.wantsWrite() ? 4 : 0), poll);
                } catch (ClosedChannelException e) {
                    arrayList.add(poll);
                }
            }
            return arrayList.isEmpty() ? Collections.emptyList() : arrayList;
        }

        /* JADX WARN: Finally extract failed */
        private void performSelect() {
            SelectorThread.this._scheduler.incrementRunningCount();
            while (!SelectorThread.this._closed.get() && acquireSelecting()) {
                try {
                    ArrayList arrayList = new ArrayList();
                    try {
                        if (!SelectorThread.this._closed.get()) {
                            Thread.currentThread().setName(SelectorThread.this._scheduler.getSelectorThreadName());
                            this._inSelect.set(true);
                            try {
                                try {
                                    if (this._wakeups.getAndSet(0) > 0) {
                                        this._selector.selectNow();
                                    } else {
                                        this._selector.select(this._nextTimeout);
                                    }
                                    this._inSelect.set(false);
                                    for (NonBlockingConnection nonBlockingConnection : processSelectionKeys()) {
                                        if (nonBlockingConnection.setScheduled()) {
                                            arrayList.add(new ConnectionProcessor(SelectorThread.this._scheduler, nonBlockingConnection));
                                        }
                                    }
                                    for (NonBlockingConnection nonBlockingConnection2 : reregisterUnregisteredConnections()) {
                                        if (nonBlockingConnection2.setScheduled()) {
                                            arrayList.add(new ConnectionProcessor(SelectorThread.this._scheduler, nonBlockingConnection2));
                                        }
                                    }
                                    for (NonBlockingConnection nonBlockingConnection3 : processUnscheduledConnections()) {
                                        if (nonBlockingConnection3.setScheduled()) {
                                            arrayList.add(new ConnectionProcessor(SelectorThread.this._scheduler, nonBlockingConnection3));
                                        }
                                    }
                                    SelectorThread.this.runTasks();
                                } catch (IOException e) {
                                    SelectorThread.LOGGER.error("Failed to trying to select()", e);
                                    closeSelector();
                                    clearSelecting();
                                    SelectorThread.this._scheduler.decrementRunningCount();
                                    return;
                                }
                            } finally {
                                this._inSelect.set(false);
                            }
                        }
                        clearSelecting();
                        if (!arrayList.isEmpty()) {
                            SelectorThread.this._workQueue.addAll(arrayList);
                            SelectorThread.this._workQueue.add(this);
                            Iterator it = arrayList.iterator();
                            while (it.hasNext()) {
                                ((ConnectionProcessor) it.next()).processConnection();
                            }
                        }
                    } catch (Throwable th) {
                        clearSelecting();
                        throw th;
                    }
                } catch (Throwable th2) {
                    SelectorThread.this._scheduler.decrementRunningCount();
                    throw th2;
                }
            }
            if (SelectorThread.this._closed.get() && acquireSelecting()) {
                closeSelector();
            }
            SelectorThread.this._scheduler.decrementRunningCount();
        }

        private void closeSelector() {
            try {
                if (this._selector.isOpen()) {
                    this._selector.close();
                }
            } catch (IOException e) {
                SelectorThread.LOGGER.debug("Failed to close selector", e);
            }
        }

        public void wakeup() {
            this._wakeups.compareAndSet(0, 1);
            if (!this._inSelect.get() || this._wakeups.get() == 0) {
                return;
            }
            this._selector.wakeup();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SelectorThread(NetworkConnectionScheduler networkConnectionScheduler, int i) throws IOException {
        this._scheduler = networkConnectionScheduler;
        this._selectionTasks = new SelectionTask[i];
        for (int i2 = 0; i2 < i; i2++) {
            this._selectionTasks[i2] = new SelectionTask();
        }
        for (SelectionTask selectionTask : this._selectionTasks) {
            this._workQueue.add(selectionTask);
        }
    }

    public void addAcceptingSocket(final ServerSocketChannel serverSocketChannel, final NonBlockingNetworkTransport nonBlockingNetworkTransport) {
        this._tasks.add(new Runnable() { // from class: org.apache.qpid.server.transport.SelectorThread.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    if (SelectorThread.LOGGER.isDebugEnabled()) {
                        SelectorThread.LOGGER.debug("Registering selector on accepting port {} ", serverSocketChannel.socket().getLocalSocketAddress());
                    }
                    serverSocketChannel.register(SelectorThread.this._selectionTasks[0].getSelector(), 16, nonBlockingNetworkTransport);
                } catch (IllegalStateException | ClosedChannelException e) {
                    SelectorThread.LOGGER.error("Failed to register selector on accepting port {} ", serverSocketChannel.socket().getLocalSocketAddress(), e);
                }
            }
        });
        this._selectionTasks[0].wakeup();
    }

    public void cancelAcceptingSocket(ServerSocketChannel serverSocketChannel) {
        try {
            cancelAcceptingSocketAsync(serverSocketChannel).get(ACCEPT_CANCELLATION_TIMEOUT, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            LOGGER.warn("Cancellation of accepting socket was interrupted");
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            LOGGER.warn("Cancellation of accepting socket failed", e2.getCause());
        } catch (TimeoutException e3) {
            LOGGER.warn("Cancellation of accepting socket timed out");
        }
    }

    private Future<Void> cancelAcceptingSocketAsync(final ServerSocketChannel serverSocketChannel) {
        final SettableFuture create = SettableFuture.create();
        this._tasks.add(new Runnable() { // from class: org.apache.qpid.server.transport.SelectorThread.2
            @Override // java.lang.Runnable
            public void run() {
                if (SelectorThread.LOGGER.isDebugEnabled()) {
                    SelectorThread.LOGGER.debug("Cancelling selector on accepting port {} ", serverSocketChannel.socket().getLocalSocketAddress());
                }
                SelectionKey selectionKey = null;
                try {
                    try {
                        selectionKey = serverSocketChannel.register(SelectorThread.this._selectionTasks[0].getSelector(), 0);
                    } catch (ClosedChannelException e) {
                        SelectorThread.LOGGER.error("Failed to deregister selector on accepting port {}", serverSocketChannel.socket().getLocalSocketAddress(), e);
                    }
                    if (selectionKey != null) {
                        selectionKey.cancel();
                    }
                } finally {
                    create.set((Object) null);
                }
            }
        });
        this._selectionTasks[0].wakeup();
        return create;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        String name = Thread.currentThread().getName();
        do {
            try {
                Thread.currentThread().setName(name);
                this._workQueue.take().run();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }
        } while (!this._closed.get());
    }

    private void unregisterConnection(NonBlockingConnection nonBlockingConnection) throws ClosedChannelException {
        nonBlockingConnection.getSocketChannel().register(nonBlockingConnection.getSelectionTask().getSelector(), 0).cancel();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void runTasks() {
        while (this._tasks.peek() != null) {
            this._tasks.poll().run();
        }
    }

    private boolean selectionInterestRequiresUpdate(NonBlockingConnection nonBlockingConnection) {
        SelectionTask selectionTask = nonBlockingConnection.getSelectionTask();
        if (selectionTask == null) {
            return true;
        }
        SelectionKey keyFor = nonBlockingConnection.getSocketChannel().keyFor(selectionTask.getSelector());
        int i = (nonBlockingConnection.wantsRead() ? 1 : 0) | (nonBlockingConnection.wantsWrite() ? 4 : 0);
        if (keyFor != null) {
            try {
                if (keyFor.isValid()) {
                    if (keyFor.interestOps() == i) {
                        return false;
                    }
                }
            } catch (CancelledKeyException e) {
                return true;
            }
        }
        return true;
    }

    public void addConnection(NonBlockingConnection nonBlockingConnection) {
        if (selectionInterestRequiresUpdate(nonBlockingConnection)) {
            SelectionTask nextSelectionTask = getNextSelectionTask();
            nonBlockingConnection.setSelectionTask(nextSelectionTask);
            nextSelectionTask.getUnregisteredConnections().add(nonBlockingConnection);
            nextSelectionTask.wakeup();
        }
    }

    public void returnConnectionToSelector(NonBlockingConnection nonBlockingConnection) {
        SelectionTask selectionTask = nonBlockingConnection.getSelectionTask();
        if (selectionTask == null) {
            throw new IllegalStateException("returnConnectionToSelector should only be called with connections that are currently assigned a selector task");
        }
        if (selectionInterestRequiresUpdate(nonBlockingConnection) || nonBlockingConnection.getTicker().getModified()) {
            selectionTask.getUnregisteredConnections().add(nonBlockingConnection);
            selectionTask.wakeup();
        }
    }

    private SelectionTask getNextSelectionTask() {
        int i;
        do {
            i = this._nextSelectorTaskIndex.get();
        } while (!this._nextSelectorTaskIndex.compareAndSet(i, (i + 1) % this._selectionTasks.length));
        return this._selectionTasks[i];
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeConnection(NonBlockingConnection nonBlockingConnection) {
        try {
            unregisterConnection(nonBlockingConnection);
        } catch (CancelledKeyException | ClosedSelectorException e) {
            LOGGER.debug("Failed to unregister with selector for connection {}. Port has probably already been closed.", nonBlockingConnection, e);
        } catch (ClosedChannelException e2) {
            LOGGER.debug("Failed to unregister with selector for connection {}. Connection is probably being closed by peer.", nonBlockingConnection);
        }
    }

    public void close() {
        Runnable runnable = new Runnable() { // from class: org.apache.qpid.server.transport.SelectorThread.3
            @Override // java.lang.Runnable
            public void run() {
            }
        };
        this._closed.set(true);
        int poolSize = this._scheduler.getPoolSize();
        while (true) {
            int i = poolSize;
            poolSize--;
            if (i <= 0) {
                break;
            } else {
                this._workQueue.offer(runnable);
            }
        }
        for (SelectionTask selectionTask : this._selectionTasks) {
            selectionTask.wakeup();
        }
    }

    public void addToWork(NonBlockingConnection nonBlockingConnection) {
        if (this._closed.get()) {
            throw new IllegalStateException("Adding connection work " + nonBlockingConnection + " to closed selector thread " + this._scheduler);
        }
        if (nonBlockingConnection.setScheduled()) {
            this._workQueue.add(new ConnectionProcessor(this._scheduler, nonBlockingConnection));
        }
    }
}
