package net.rubyeye.xmemcached.impl;

import com.google.code.yanf4j.config.Configuration;
import com.google.code.yanf4j.core.Controller;
import com.google.code.yanf4j.core.ControllerStateListener;
import com.google.code.yanf4j.core.EventType;
import com.google.code.yanf4j.core.Session;
import com.google.code.yanf4j.nio.NioSession;
import com.google.code.yanf4j.nio.impl.SocketChannelController;
import com.google.code.yanf4j.util.ConcurrentHashSet;
import com.google.code.yanf4j.util.SystemUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import net.rubyeye.xmemcached.CommandFactory;
import net.rubyeye.xmemcached.MemcachedOptimizer;
import net.rubyeye.xmemcached.MemcachedSessionLocator;
import net.rubyeye.xmemcached.buffer.BufferAllocator;
import net.rubyeye.xmemcached.command.Command;
import net.rubyeye.xmemcached.exception.MemcachedException;
import net.rubyeye.xmemcached.networking.Connector;
import net.rubyeye.xmemcached.networking.MemcachedSession;
import net.rubyeye.xmemcached.utils.InetSocketAddressWrapper;
import net.rubyeye.xmemcached.utils.Protocol;

/* loaded from: input_file:net/rubyeye/xmemcached/impl/MemcachedConnector.class */
public class MemcachedConnector extends SocketChannelController implements Connector {
    private final DelayQueue<ReconnectRequest> waitingQueue;
    private BufferAllocator bufferAllocator;
    private final Set<InetSocketAddress> removedAddrSet;
    private final MemcachedOptimizer optimiezer;
    private volatile long healSessionInterval;
    private int connectionPoolSize;
    protected Protocol protocol;
    private final CommandFactory commandFactory;
    private volatile boolean failureMode;
    private final ConcurrentHashMap<InetSocketAddress, List<Session>> standbySessionMap;
    protected MemcachedSessionLocator sessionLocator;
    protected final ConcurrentHashMap<InetSocketAddress, Queue<Session>> sessionMap;
    private static final MemcachedSessionComparator sessionComparator = new MemcachedSessionComparator();
    private final Random random;

    /* loaded from: input_file:net/rubyeye/xmemcached/impl/MemcachedConnector$InnerControllerStateListener.class */
    class InnerControllerStateListener implements ControllerStateListener {
        private final SessionMonitor sessionMonitor;

        InnerControllerStateListener() {
            this.sessionMonitor = new SessionMonitor();
        }

        @Override // com.google.code.yanf4j.core.ControllerStateListener
        public void onAllSessionClosed(Controller controller) {
        }

        @Override // com.google.code.yanf4j.core.ControllerStateListener
        public void onException(Controller controller, Throwable th) {
            MemcachedConnector.log.error("Exception occured in controller", th);
        }

        @Override // com.google.code.yanf4j.core.ControllerStateListener
        public void onReady(Controller controller) {
            this.sessionMonitor.start();
        }

        @Override // com.google.code.yanf4j.core.ControllerStateListener
        public void onStarted(Controller controller) {
        }

        @Override // com.google.code.yanf4j.core.ControllerStateListener
        public void onStopped(Controller controller) {
            this.sessionMonitor.interrupt();
        }
    }

    /* loaded from: input_file:net/rubyeye/xmemcached/impl/MemcachedConnector$SessionMonitor.class */
    class SessionMonitor extends Thread {
        public SessionMonitor() {
            setName("Heal-Session-Thread");
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            InetSocketAddress inetSocketAddress;
            while (MemcachedConnector.this.isStarted()) {
                ReconnectRequest reconnectRequest = null;
                try {
                    reconnectRequest = (ReconnectRequest) MemcachedConnector.this.waitingQueue.take();
                    inetSocketAddress = reconnectRequest.getInetSocketAddressWrapper().getInetSocketAddress();
                } catch (InterruptedException e) {
                } catch (Exception e2) {
                    MemcachedConnector.log.error("SessionMonitor connect error", e2);
                    rescheduleConnectRequest(reconnectRequest);
                }
                if (MemcachedConnector.this.removedAddrSet.contains(inetSocketAddress)) {
                    MemcachedConnector.log.warn("Remove invalid reconnect task for " + inetSocketAddress);
                } else {
                    Future<Boolean> connect = MemcachedConnector.this.connect(reconnectRequest.getInetSocketAddressWrapper());
                    reconnectRequest.setTries(reconnectRequest.getTries() + 1);
                    try {
                        try {
                            MemcachedConnector.log.warn("Trying to connect to " + inetSocketAddress.getAddress().getHostAddress() + ":" + inetSocketAddress.getPort() + " for " + reconnectRequest.getTries() + " times");
                        } catch (Throwable th) {
                            if (0 == 0) {
                                rescheduleConnectRequest(reconnectRequest);
                                throw th;
                                break;
                            }
                        }
                    } catch (ExecutionException e3) {
                        connect.cancel(true);
                        if (0 == 0) {
                            rescheduleConnectRequest(reconnectRequest);
                        }
                    } catch (TimeoutException e4) {
                        connect.cancel(true);
                        if (0 == 0) {
                            rescheduleConnectRequest(reconnectRequest);
                        }
                    }
                    if (!(connect.get(60000L, TimeUnit.MILLISECONDS).booleanValue())) {
                        rescheduleConnectRequest(reconnectRequest);
                    }
                }
            }
        }

        private void rescheduleConnectRequest(ReconnectRequest reconnectRequest) {
            if (reconnectRequest == null) {
                return;
            }
            InetSocketAddress inetSocketAddress = reconnectRequest.getInetSocketAddressWrapper().getInetSocketAddress();
            reconnectRequest.updateNextReconnectTimeStamp(MemcachedConnector.this.healSessionInterval * reconnectRequest.getTries());
            MemcachedConnector.log.error("Reconnect to " + inetSocketAddress.getAddress().getHostAddress() + ":" + inetSocketAddress.getPort() + " fail");
            MemcachedConnector.this.waitingQueue.offer((DelayQueue) reconnectRequest);
        }
    }

    @Override // net.rubyeye.xmemcached.networking.Connector
    public void setSessionLocator(MemcachedSessionLocator memcachedSessionLocator) {
        this.sessionLocator = memcachedSessionLocator;
    }

    @Override // net.rubyeye.xmemcached.networking.Connector
    public Queue<ReconnectRequest> getReconnectRequestQueue() {
        return this.waitingQueue;
    }

    @Override // com.google.code.yanf4j.core.impl.AbstractController, net.rubyeye.xmemcached.networking.Connector
    public Set<Session> getSessionSet() {
        Collection<Queue<Session>> values = this.sessionMap.values();
        HashSet hashSet = new HashSet();
        Iterator<Queue<Session>> it = values.iterator();
        while (it.hasNext()) {
            hashSet.addAll(it.next());
        }
        return hashSet;
    }

    @Override // net.rubyeye.xmemcached.networking.Connector
    public final void setHealSessionInterval(long j) {
        this.healSessionInterval = j;
    }

    @Override // net.rubyeye.xmemcached.networking.Connector
    public long getHealSessionInterval() {
        return this.healSessionInterval;
    }

    @Override // net.rubyeye.xmemcached.networking.Connector
    public void setOptimizeGet(boolean z) {
        ((OptimizerMBean) this.optimiezer).setOptimizeGet(z);
    }

    @Override // net.rubyeye.xmemcached.networking.Connector
    public void setOptimizeMergeBuffer(boolean z) {
        ((OptimizerMBean) this.optimiezer).setOptimizeMergeBuffer(z);
    }

    public Protocol getProtocol() {
        return this.protocol;
    }

    public synchronized void addSession(Session session) {
        InetSocketAddress mainNodeAddress = ((MemcachedSession) session).getInetSocketAddressWrapper().getMainNodeAddress();
        if (mainNodeAddress != null) {
            addStandbySession(session, mainNodeAddress);
        } else {
            addMainSession(session);
            updateSessions();
        }
    }

    private void addMainSession(Session session) {
        InetSocketAddress remoteSocketAddress = session.getRemoteSocketAddress();
        log.warn("Add a session: " + SystemUtils.getRawAddress(remoteSocketAddress) + ":" + remoteSocketAddress.getPort());
        Queue<Session> queue = this.sessionMap.get(remoteSocketAddress);
        if (queue == null) {
            queue = new ConcurrentLinkedQueue();
            Queue<Session> putIfAbsent = this.sessionMap.putIfAbsent(remoteSocketAddress, queue);
            if (null != putIfAbsent) {
                queue = putIfAbsent;
            }
        }
        if (this.failureMode) {
            Iterator<Session> it = queue.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                } else if (it.next().isClosed()) {
                    it.remove();
                    break;
                }
            }
        }
        queue.offer(session);
        while (queue.size() > this.connectionPoolSize) {
            Session poll = queue.poll();
            ((MemcachedSession) poll).setAllowReconnect(false);
            poll.close();
        }
    }

    private void addStandbySession(Session session, InetSocketAddress inetSocketAddress) {
        InetSocketAddress remoteSocketAddress = session.getRemoteSocketAddress();
        log.warn("Add a standby session: " + SystemUtils.getRawAddress(remoteSocketAddress) + ":" + remoteSocketAddress.getPort() + " for " + SystemUtils.getRawAddress(inetSocketAddress) + ":" + inetSocketAddress.getPort());
        List<Session> list = this.standbySessionMap.get(inetSocketAddress);
        if (list == null) {
            list = new CopyOnWriteArrayList();
            List<Session> putIfAbsent = this.standbySessionMap.putIfAbsent(inetSocketAddress, list);
            if (null != putIfAbsent) {
                list = putIfAbsent;
            }
        }
        list.add(session);
    }

    public List<Session> getSessionListBySocketAddress(InetSocketAddress inetSocketAddress) {
        Queue<Session> queue = this.sessionMap.get(inetSocketAddress);
        if (queue != null) {
            return new ArrayList(queue);
        }
        return null;
    }

    @Override // net.rubyeye.xmemcached.networking.Connector
    public void removeReconnectRequest(InetSocketAddress inetSocketAddress) {
        this.removedAddrSet.add(inetSocketAddress);
        Iterator<ReconnectRequest> it = this.waitingQueue.iterator();
        while (it.hasNext()) {
            ReconnectRequest next = it.next();
            if (next.getInetSocketAddressWrapper().getInetSocketAddress().equals(inetSocketAddress)) {
                it.remove();
                log.warn("Remove invalid reconnect task for " + next.getInetSocketAddressWrapper().getInetSocketAddress());
            }
        }
    }

    @Override // net.rubyeye.xmemcached.networking.Connector
    public final void updateSessions() {
        Collection<Queue<Session>> values = this.sessionMap.values();
        ArrayList arrayList = new ArrayList(20);
        Iterator<Queue<Session>> it = values.iterator();
        while (it.hasNext()) {
            arrayList.addAll(it.next());
        }
        Collections.sort(arrayList, sessionComparator);
        this.sessionLocator.updateSessions(arrayList);
    }

    @Override // net.rubyeye.xmemcached.networking.Connector
    public synchronized void removeSession(Session session) {
        InetSocketAddress mainNodeAddress = ((MemcachedTCPSession) session).getInetSocketAddressWrapper().getMainNodeAddress();
        if (mainNodeAddress != null) {
            removeStandbySession(session, mainNodeAddress);
        } else {
            removeMainSession(session);
        }
    }

    private void removeMainSession(Session session) {
        InetSocketAddress remoteSocketAddress = session.getRemoteSocketAddress();
        if (this.failureMode) {
            log.warn("Client in failure mode,we don't remove session " + SystemUtils.getRawAddress(remoteSocketAddress) + ":" + remoteSocketAddress.getPort());
            return;
        }
        log.warn("Remove a session: " + SystemUtils.getRawAddress(remoteSocketAddress) + ":" + remoteSocketAddress.getPort());
        Queue<Session> queue = this.sessionMap.get(session.getRemoteSocketAddress());
        if (null != queue) {
            queue.remove(session);
            if (queue.size() == 0) {
                this.sessionMap.remove(session.getRemoteSocketAddress());
            }
            updateSessions();
        }
    }

    private void removeStandbySession(Session session, InetSocketAddress inetSocketAddress) {
        List<Session> list = this.standbySessionMap.get(inetSocketAddress);
        if (null != list) {
            list.remove(session);
            if (list.size() == 0) {
                this.standbySessionMap.remove(inetSocketAddress);
            }
        }
    }

    @Override // com.google.code.yanf4j.nio.impl.NioController
    protected void doStart() throws IOException {
        setLocalSocketAddress(new InetSocketAddress("localhost", 0));
    }

    @Override // com.google.code.yanf4j.core.impl.AbstractController, com.google.code.yanf4j.nio.SelectionKeyHandler
    public void onConnect(SelectionKey selectionKey) throws IOException {
        selectionKey.interestOps(selectionKey.interestOps() & (-9));
        ConnectFuture connectFuture = (ConnectFuture) selectionKey.attachment();
        if (connectFuture == null || connectFuture.isCancelled()) {
            cancelKey(selectionKey);
            return;
        }
        try {
            if (((SocketChannel) selectionKey.channel()).finishConnect()) {
                selectionKey.attach(null);
                addSession(createSession((SocketChannel) selectionKey.channel(), connectFuture.getInetSocketAddressWrapper()));
                connectFuture.setResult(Boolean.TRUE);
            } else {
                cancelKey(selectionKey);
                connectFuture.failure(new IOException("Connect to " + SystemUtils.getRawAddress(connectFuture.getInetSocketAddressWrapper().getInetSocketAddress()) + ":" + connectFuture.getInetSocketAddressWrapper().getInetSocketAddress().getPort() + " fail"));
            }
        } catch (Exception e) {
            connectFuture.failure(e);
            cancelKey(selectionKey);
            throw new IOException("Connect to " + SystemUtils.getRawAddress(connectFuture.getInetSocketAddressWrapper().getInetSocketAddress()) + ":" + connectFuture.getInetSocketAddressWrapper().getInetSocketAddress().getPort() + " fail," + e.getMessage());
        }
    }

    private void cancelKey(SelectionKey selectionKey) throws IOException {
        try {
            if (selectionKey.channel() != null) {
                selectionKey.channel().close();
            }
        } finally {
            selectionKey.cancel();
        }
    }

    protected MemcachedTCPSession createSession(SocketChannel socketChannel, InetSocketAddressWrapper inetSocketAddressWrapper) {
        MemcachedTCPSession memcachedTCPSession = (MemcachedTCPSession) buildSession(socketChannel);
        memcachedTCPSession.setInetSocketAddressWrapper(inetSocketAddressWrapper);
        this.selectorManager.registerSession(memcachedTCPSession, EventType.ENABLE_READ);
        memcachedTCPSession.start();
        memcachedTCPSession.onEvent(EventType.CONNECTED, null);
        return memcachedTCPSession;
    }

    @Override // net.rubyeye.xmemcached.networking.Connector
    public void addToWatingQueue(ReconnectRequest reconnectRequest) {
        this.waitingQueue.add((DelayQueue<ReconnectRequest>) reconnectRequest);
    }

    @Override // net.rubyeye.xmemcached.networking.Connector
    public Future<Boolean> connect(InetSocketAddressWrapper inetSocketAddressWrapper) throws IOException {
        if (inetSocketAddressWrapper == null) {
            throw new NullPointerException("Null Address");
        }
        this.removedAddrSet.remove(inetSocketAddressWrapper.getInetSocketAddress());
        SocketChannel socketChannel = null;
        try {
            SocketChannel open = SocketChannel.open();
            configureSocketChannel(open);
            ConnectFuture connectFuture = new ConnectFuture(inetSocketAddressWrapper);
            if (open.connect(inetSocketAddressWrapper.getInetSocketAddress())) {
                addSession(createSession(open, inetSocketAddressWrapper));
                connectFuture.setResult(true);
            } else {
                this.selectorManager.registerChannel(open, 8, connectFuture);
            }
            return connectFuture;
        } catch (IOException e) {
            if (0 != 0) {
                socketChannel.close();
            }
            throw e;
        }
    }

    @Override // com.google.code.yanf4j.nio.SelectionKeyHandler
    public void closeChannel(Selector selector) throws IOException {
    }

    @Override // net.rubyeye.xmemcached.networking.Connector
    public void send(Command command) throws MemcachedException {
        MemcachedSession memcachedSession = (MemcachedSession) findSessionByKey(command.getKey());
        if (memcachedSession == null) {
            throw new MemcachedException("There is no available connection at this moment");
        }
        if (memcachedSession.isClosed()) {
            memcachedSession = findStandbySession(memcachedSession);
        }
        if (memcachedSession.isClosed()) {
            throw new MemcachedException("Session(" + SystemUtils.getRawAddress(memcachedSession.getRemoteSocketAddress()) + ":" + memcachedSession.getRemoteSocketAddress().getPort() + ") has been closed");
        }
        if (memcachedSession.isAuthFailed()) {
            throw new MemcachedException("Auth failed to connection " + memcachedSession.getRemoteSocketAddress());
        }
        memcachedSession.write(command);
    }

    private MemcachedSession findStandbySession(MemcachedSession memcachedSession) {
        List<Session> standbySessionListByMainNodeAddr;
        return (!this.failureMode || (standbySessionListByMainNodeAddr = getStandbySessionListByMainNodeAddr(memcachedSession.getRemoteSocketAddress())) == null || standbySessionListByMainNodeAddr.isEmpty()) ? memcachedSession : (MemcachedTCPSession) standbySessionListByMainNodeAddr.get(this.random.nextInt(standbySessionListByMainNodeAddr.size()));
    }

    @Override // net.rubyeye.xmemcached.networking.Connector
    public List<Session> getStandbySessionListByMainNodeAddr(InetSocketAddress inetSocketAddress) {
        return this.standbySessionMap.get(inetSocketAddress);
    }

    public final Session findSessionByKey(String str) {
        return this.sessionLocator.getSessionByKey(str);
    }

    @Override // net.rubyeye.xmemcached.networking.Connector
    public final Queue<Session> getSessionByAddress(InetSocketAddress inetSocketAddress) {
        return this.sessionMap.get(inetSocketAddress);
    }

    public MemcachedConnector(Configuration configuration, MemcachedSessionLocator memcachedSessionLocator, BufferAllocator bufferAllocator, CommandFactory commandFactory, int i) {
        super(configuration, null);
        this.waitingQueue = new DelayQueue<>();
        this.removedAddrSet = new ConcurrentHashSet();
        this.healSessionInterval = 2000L;
        this.standbySessionMap = new ConcurrentHashMap<>();
        this.sessionMap = new ConcurrentHashMap<>();
        this.random = new Random();
        this.sessionLocator = memcachedSessionLocator;
        this.protocol = commandFactory.getProtocol();
        addStateListener(new InnerControllerStateListener());
        updateSessions();
        this.bufferAllocator = bufferAllocator;
        this.optimiezer = new Optimizer(this.protocol);
        this.optimiezer.setBufferAllocator(this.bufferAllocator);
        this.connectionPoolSize = i;
        this.soLingerOn = true;
        this.commandFactory = commandFactory;
        setSelectorPoolSize(2 * Runtime.getRuntime().availableProcessors());
    }

    @Override // net.rubyeye.xmemcached.networking.Connector
    public final void setConnectionPoolSize(int i) {
        this.connectionPoolSize = i;
    }

    @Override // net.rubyeye.xmemcached.networking.Connector
    public void setMergeFactor(int i) {
        ((OptimizerMBean) this.optimiezer).setMergeFactor(i);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.google.code.yanf4j.nio.impl.SocketChannelController
    public NioSession buildSession(SocketChannel socketChannel) {
        MemcachedTCPSession memcachedTCPSession = new MemcachedTCPSession(buildSessionConfig(socketChannel, buildQueue()), this.configuration.getSessionReadBufferSize(), this.optimiezer, getReadThreadCount(), this.commandFactory);
        memcachedTCPSession.setBufferAllocator(this.bufferAllocator);
        return memcachedTCPSession;
    }

    public BufferAllocator getBufferAllocator() {
        return this.bufferAllocator;
    }

    @Override // net.rubyeye.xmemcached.networking.Connector
    public synchronized void quitAllSessions() {
        Iterator<Session> it = this.sessionSet.iterator();
        while (it.hasNext()) {
            ((MemcachedSession) it.next()).quit();
        }
        int i = 0;
        while (true) {
            int i2 = i;
            i++;
            if (i2 >= 5 || this.sessionSet.size() <= 0) {
                return;
            }
            try {
                wait(1000L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    @Override // net.rubyeye.xmemcached.networking.Connector
    public void setFailureMode(boolean z) {
        this.failureMode = z;
    }

    @Override // net.rubyeye.xmemcached.networking.Connector
    public void setBufferAllocator(BufferAllocator bufferAllocator) {
        this.bufferAllocator = bufferAllocator;
        Iterator<Session> it = getSessionSet().iterator();
        while (it.hasNext()) {
            ((MemcachedSession) it.next()).setBufferAllocator(bufferAllocator);
        }
    }

    public Collection<InetSocketAddress> getServerAddresses() {
        return Collections.unmodifiableCollection(this.sessionMap.keySet());
    }
}
