package net.spy.memcached;

import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import net.spy.memcached.compat.SpyObject;
import net.spy.memcached.ops.KeyedOperation;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationException;
import net.spy.memcached.ops.OperationState;

/* loaded from: input_file:net/spy/memcached/MemcachedConnection.class */
public final class MemcachedConnection extends SpyObject {
    private static final int DOUBLE_CHECK_EMPTY = 256;
    private static final int EXCESSIVE_EMPTY = 16777216;
    private final boolean shouldOptimize;
    private Selector selector;
    private final NodeLocator locator;
    private final FailureMode failureMode;
    private final long maxDelay;
    private final ConcurrentLinkedQueue<MemcachedNode> addedQueue;
    private final SortedMap<Long, MemcachedNode> reconnectQueue;
    private final OperationFactory opFact;
    private final int timeoutExceptionThreshold;
    private static AtomicInteger continuousTimeout;
    static final /* synthetic */ boolean $assertionsDisabled;
    private volatile boolean shutDown = false;
    private int emptySelects = 0;
    private final Collection<ConnectionObserver> connObservers = new ConcurrentLinkedQueue();

    public static void setContinuousTimeout(boolean z) {
        if (z) {
            continuousTimeout.getAndAdd(1);
        } else {
            continuousTimeout.set(0);
        }
    }

    public MemcachedConnection(int i, ConnectionFactory connectionFactory, List<InetSocketAddress> list, Collection<ConnectionObserver> collection, FailureMode failureMode, OperationFactory operationFactory) throws IOException {
        this.selector = null;
        this.connObservers.addAll(collection);
        this.reconnectQueue = new TreeMap();
        this.addedQueue = new ConcurrentLinkedQueue<>();
        this.failureMode = failureMode;
        this.shouldOptimize = connectionFactory.shouldOptimize();
        this.maxDelay = connectionFactory.getMaxReconnectDelay();
        this.opFact = operationFactory;
        this.timeoutExceptionThreshold = connectionFactory.getTimeoutExceptionThreshold();
        this.selector = Selector.open();
        ArrayList arrayList = new ArrayList(list.size());
        for (InetSocketAddress inetSocketAddress : list) {
            SocketChannel open = SocketChannel.open();
            open.configureBlocking(false);
            MemcachedNode createMemcachedNode = connectionFactory.createMemcachedNode(inetSocketAddress, open, i);
            int i2 = 0;
            open.socket().setTcpNoDelay(!connectionFactory.useNagleAlgorithm());
            try {
                if (open.connect(inetSocketAddress)) {
                    getLogger().info("Connected to %s immediately", createMemcachedNode);
                    connected(createMemcachedNode);
                } else {
                    getLogger().info("Added %s to connect queue", createMemcachedNode);
                    i2 = 8;
                }
                createMemcachedNode.setSk(open.register(this.selector, i2, createMemcachedNode));
            } catch (SocketException e) {
                getLogger().warn("Socket error on initial connect", e);
                queueReconnect(createMemcachedNode);
            }
            if (!$assertionsDisabled && !open.isConnected() && createMemcachedNode.getSk().interestOps() != 8) {
                throw new AssertionError("Not connected, and not wanting to connect");
                break;
            }
            arrayList.add(createMemcachedNode);
        }
        this.locator = connectionFactory.createLocator(arrayList);
    }

    private boolean selectorsMakeSense() {
        for (MemcachedNode memcachedNode : this.locator.getAll()) {
            if (memcachedNode.getSk() != null && memcachedNode.getSk().isValid()) {
                if (memcachedNode.getChannel().isConnected()) {
                    int interestOps = memcachedNode.getSk().interestOps();
                    int i = memcachedNode.hasReadOp() ? 0 | 1 : 0;
                    if (memcachedNode.hasWriteOp()) {
                        i |= 4;
                    }
                    if (memcachedNode.getBytesRemainingToWrite() > 0) {
                        i |= 4;
                    }
                    if (!$assertionsDisabled && interestOps != i) {
                        throw new AssertionError("Invalid ops:  " + memcachedNode + ", expected " + i + ", got " + interestOps);
                    }
                } else {
                    int interestOps2 = memcachedNode.getSk().interestOps();
                    if (!$assertionsDisabled && interestOps2 != 8) {
                        throw new AssertionError("Not connected, and not watching for connect: " + interestOps2);
                    }
                }
            }
        }
        getLogger().debug("Checked the selectors.");
        return true;
    }

    public void handleIO() throws IOException {
        if (this.shutDown) {
            throw new IOException("No IO while shut down");
        }
        handleInputQueue();
        getLogger().debug("Done dealing with queue.");
        long max = this.reconnectQueue.isEmpty() ? 0L : Math.max(this.reconnectQueue.firstKey().longValue() - System.currentTimeMillis(), 1L);
        getLogger().debug("Selecting with delay of %sms", Long.valueOf(max));
        if (!$assertionsDisabled && !selectorsMakeSense()) {
            throw new AssertionError("Selectors don't make sense.");
        }
        int select = this.selector.select(max);
        Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
        if (!selectedKeys.isEmpty() || this.shutDown) {
            getLogger().debug("Selected %d, selected %d keys", Integer.valueOf(select), Integer.valueOf(selectedKeys.size()));
            this.emptySelects = 0;
            for (SelectionKey selectionKey : selectedKeys) {
                if (continuousTimeout.get() > this.timeoutExceptionThreshold) {
                    lostConnection((MemcachedNode) selectionKey.attachment());
                } else {
                    handleIO(selectionKey);
                }
            }
            selectedKeys.clear();
        } else {
            getLogger().debug("No selectors ready, interrupted: " + Thread.interrupted());
            int i = this.emptySelects + 1;
            this.emptySelects = i;
            if (i > DOUBLE_CHECK_EMPTY) {
                for (SelectionKey selectionKey2 : this.selector.keys()) {
                    getLogger().info("%s has %s, interested in %s", selectionKey2, Integer.valueOf(selectionKey2.readyOps()), Integer.valueOf(selectionKey2.interestOps()));
                    if (selectionKey2.readyOps() != 0) {
                        getLogger().info("%s has a ready op, handling IO", selectionKey2);
                        handleIO(selectionKey2);
                    } else {
                        lostConnection((MemcachedNode) selectionKey2.attachment());
                    }
                }
                if (!$assertionsDisabled && this.emptySelects >= EXCESSIVE_EMPTY) {
                    throw new AssertionError("Too many empty selects");
                }
            }
        }
        if (this.shutDown || this.reconnectQueue.isEmpty()) {
            return;
        }
        attemptReconnects();
    }

    private void handleInputQueue() {
        if (this.addedQueue.isEmpty()) {
            return;
        }
        getLogger().debug("Handling queue");
        HashSet hashSet = new HashSet();
        HashSet<MemcachedNode> hashSet2 = new HashSet();
        while (true) {
            try {
                MemcachedNode remove = this.addedQueue.remove();
                if (remove == null) {
                    break;
                } else {
                    hashSet2.add(remove);
                }
            } catch (NoSuchElementException e) {
            }
        }
        for (MemcachedNode memcachedNode : hashSet2) {
            boolean z = false;
            if (!memcachedNode.isActive()) {
                hashSet.add(memcachedNode);
            } else if (memcachedNode.getCurrentWriteOp() != null) {
                z = true;
                getLogger().debug("Handling queued write %s", memcachedNode);
            }
            memcachedNode.copyInputQueue();
            if (z) {
                try {
                    if (memcachedNode.getWbuf().hasRemaining()) {
                        handleWrites(memcachedNode.getSk(), memcachedNode);
                    }
                } catch (IOException e2) {
                    getLogger().warn("Exception handling write", e2);
                    lostConnection(memcachedNode);
                }
            }
            memcachedNode.fixupOps();
        }
        this.addedQueue.addAll(hashSet);
    }

    public boolean addObserver(ConnectionObserver connectionObserver) {
        return this.connObservers.add(connectionObserver);
    }

    public boolean removeObserver(ConnectionObserver connectionObserver) {
        return this.connObservers.remove(connectionObserver);
    }

    private void connected(MemcachedNode memcachedNode) {
        if (!$assertionsDisabled && !memcachedNode.getChannel().isConnected()) {
            throw new AssertionError("Not connected.");
        }
        int reconnectCount = memcachedNode.getReconnectCount();
        memcachedNode.connected();
        Iterator<ConnectionObserver> it = this.connObservers.iterator();
        while (it.hasNext()) {
            it.next().connectionEstablished(memcachedNode.getSocketAddress(), reconnectCount);
        }
    }

    private void lostConnection(MemcachedNode memcachedNode) {
        queueReconnect(memcachedNode);
        Iterator<ConnectionObserver> it = this.connObservers.iterator();
        while (it.hasNext()) {
            it.next().connectionLost(memcachedNode.getSocketAddress());
        }
    }

    private void handleIO(SelectionKey selectionKey) {
        MemcachedNode memcachedNode = (MemcachedNode) selectionKey.attachment();
        try {
            getLogger().debug("Handling IO for:  %s (r=%s, w=%s, c=%s, op=%s)", selectionKey, Boolean.valueOf(selectionKey.isReadable()), Boolean.valueOf(selectionKey.isWritable()), Boolean.valueOf(selectionKey.isConnectable()), selectionKey.attachment());
            if (selectionKey.isConnectable()) {
                getLogger().info("Connection state changed for %s", selectionKey);
                SocketChannel channel = memcachedNode.getChannel();
                if (channel.finishConnect()) {
                    connected(memcachedNode);
                    this.addedQueue.offer(memcachedNode);
                    if (memcachedNode.getWbuf().hasRemaining()) {
                        handleWrites(selectionKey, memcachedNode);
                    }
                } else if (!$assertionsDisabled && channel.isConnected()) {
                    throw new AssertionError("connected");
                }
            } else {
                if (selectionKey.isReadable()) {
                    handleReads(selectionKey, memcachedNode);
                }
                if (selectionKey.isWritable()) {
                    handleWrites(selectionKey, memcachedNode);
                }
            }
        } catch (ConnectException e) {
            getLogger().info("Reconnecting due to failure to connect to %s", memcachedNode, e);
            queueReconnect(memcachedNode);
        } catch (ClosedChannelException e2) {
            if (!this.shutDown) {
                getLogger().info("Closed channel and not shutting down.  Queueing reconnect on %s", memcachedNode, e2);
                lostConnection(memcachedNode);
            }
        } catch (OperationException e3) {
            memcachedNode.setupForAuth();
            getLogger().info("Reconnection due to exception handling a memcached operation on %s.  This may be due to an authentication failure.", memcachedNode, e3);
            lostConnection(memcachedNode);
        } catch (Exception e4) {
            memcachedNode.setupForAuth();
            getLogger().info("Reconnecting due to exception on %s", memcachedNode, e4);
            lostConnection(memcachedNode);
        }
        memcachedNode.fixupOps();
    }

    private void handleWrites(SelectionKey selectionKey, MemcachedNode memcachedNode) throws IOException {
        memcachedNode.fillWriteBuffer(this.shouldOptimize);
        boolean z = memcachedNode.getBytesRemainingToWrite() > 0;
        while (z) {
            int writeSome = memcachedNode.writeSome();
            memcachedNode.fillWriteBuffer(this.shouldOptimize);
            z = writeSome > 0 && memcachedNode.getBytesRemainingToWrite() > 0;
        }
    }

    private void handleReads(SelectionKey selectionKey, MemcachedNode memcachedNode) throws IOException {
        Operation currentReadOp = memcachedNode.getCurrentReadOp();
        ByteBuffer rbuf = memcachedNode.getRbuf();
        SocketChannel channel = memcachedNode.getChannel();
        int read = channel.read(rbuf);
        if (read < 0) {
            throw new IOException("Disconnected unexpected, will reconnect.");
        }
        while (read > 0) {
            getLogger().debug("Read %d bytes", Integer.valueOf(read));
            rbuf.flip();
            while (rbuf.remaining() > 0) {
                if (currentReadOp == null) {
                    throw new IllegalStateException("No read operation.");
                }
                currentReadOp.readFromBuffer(rbuf);
                if (currentReadOp.getState() == OperationState.COMPLETE) {
                    getLogger().debug("Completed read op: %s and giving the next %d bytes", currentReadOp, Integer.valueOf(rbuf.remaining()));
                    Operation removeCurrentReadOp = memcachedNode.removeCurrentReadOp();
                    if (!$assertionsDisabled && removeCurrentReadOp != currentReadOp) {
                        throw new AssertionError("Expected to pop " + currentReadOp + " got " + removeCurrentReadOp);
                    }
                    currentReadOp = memcachedNode.getCurrentReadOp();
                }
            }
            rbuf.clear();
            read = channel.read(rbuf);
        }
    }

    static String dbgBuffer(ByteBuffer byteBuffer, int i) {
        StringBuilder sb = new StringBuilder();
        byte[] array = byteBuffer.array();
        for (int i2 = 0; i2 < i; i2++) {
            char c = (char) array[i2];
            if (Character.isWhitespace(c) || Character.isLetterOrDigit(c)) {
                sb.append(c);
            } else {
                sb.append("\\x");
                sb.append(Integer.toHexString(array[i2] & 255));
            }
        }
        return sb.toString();
    }

    private void queueReconnect(MemcachedNode memcachedNode) {
        long j;
        if (this.shutDown) {
            return;
        }
        getLogger().warn("Closing, and reopening %s, attempt %d.", memcachedNode, Integer.valueOf(memcachedNode.getReconnectCount()));
        if (memcachedNode.getSk() != null) {
            memcachedNode.getSk().cancel();
            if (!$assertionsDisabled && memcachedNode.getSk().isValid()) {
                throw new AssertionError("Cancelled selection key is valid");
            }
        }
        memcachedNode.reconnecting();
        try {
            if (memcachedNode.getChannel() == null || memcachedNode.getChannel().socket() == null) {
                getLogger().info("The channel or socket was null for %s", memcachedNode);
            } else {
                memcachedNode.getChannel().socket().close();
            }
        } catch (IOException e) {
            getLogger().warn("IOException trying to close a socket", e);
        }
        memcachedNode.setChannel(null);
        long min = ((long) Math.min(this.maxDelay, Math.pow(2.0d, memcachedNode.getReconnectCount()))) * 1000;
        long currentTimeMillis = System.currentTimeMillis();
        long j2 = min;
        while (true) {
            j = currentTimeMillis + j2;
            if (!this.reconnectQueue.containsKey(Long.valueOf(j))) {
                break;
            }
            currentTimeMillis = j;
            j2 = 1;
        }
        this.reconnectQueue.put(Long.valueOf(j), memcachedNode);
        memcachedNode.setupResend();
        if (this.failureMode == FailureMode.Redistribute) {
            redistributeOperations(memcachedNode.destroyInputQueue());
        } else if (this.failureMode == FailureMode.Cancel) {
            cancelOperations(memcachedNode.destroyInputQueue());
        }
    }

    private void cancelOperations(Collection<Operation> collection) {
        Iterator<Operation> it = collection.iterator();
        while (it.hasNext()) {
            it.next().cancel();
        }
    }

    private void redistributeOperations(Collection<Operation> collection) {
        for (Operation operation : collection) {
            if (operation instanceof KeyedOperation) {
                KeyedOperation keyedOperation = (KeyedOperation) operation;
                int i = 0;
                for (String str : keyedOperation.getKeys()) {
                    Iterator<Operation> it = this.opFact.clone(keyedOperation).iterator();
                    while (it.hasNext()) {
                        addOperation(str, it.next());
                        i++;
                    }
                }
                if (!$assertionsDisabled && i <= 0) {
                    throw new AssertionError("Didn't add any new operations when redistributing");
                }
            } else {
                operation.cancel();
            }
        }
    }

    private void attemptReconnects() throws IOException {
        long currentTimeMillis = System.currentTimeMillis();
        IdentityHashMap identityHashMap = new IdentityHashMap();
        ArrayList arrayList = new ArrayList();
        Iterator<MemcachedNode> it = this.reconnectQueue.headMap(Long.valueOf(currentTimeMillis)).values().iterator();
        while (it.hasNext()) {
            MemcachedNode next = it.next();
            it.remove();
            try {
                if (identityHashMap.containsKey(next)) {
                    getLogger().debug("Skipping duplicate reconnect request for %s", next);
                } else {
                    identityHashMap.put(next, Boolean.TRUE);
                    getLogger().info("Reconnecting %s", next);
                    SocketChannel open = SocketChannel.open();
                    open.configureBlocking(false);
                    int i = 0;
                    if (open.connect(next.getSocketAddress())) {
                        getLogger().info("Immediately reconnected to %s", next);
                        if (!$assertionsDisabled && !open.isConnected()) {
                            throw new AssertionError();
                        }
                    } else {
                        i = 8;
                    }
                    next.registerChannel(open, open.register(this.selector, i, next));
                    if (!$assertionsDisabled && next.getChannel() != open) {
                        throw new AssertionError("Channel was lost.");
                    }
                }
            } catch (SocketException e) {
                getLogger().warn("Error on reconnect", e);
                arrayList.add(next);
            }
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            queueReconnect((MemcachedNode) it2.next());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NodeLocator getLocator() {
        return this.locator;
    }

    public void addOperation(String str, Operation operation) {
        MemcachedNode memcachedNode = null;
        MemcachedNode primary = this.locator.getPrimary(str);
        if (primary.isActive() || this.failureMode == FailureMode.Retry) {
            memcachedNode = primary;
        } else if (this.failureMode == FailureMode.Cancel) {
            operation.cancel();
        } else {
            Iterator<MemcachedNode> sequence = this.locator.getSequence(str);
            while (memcachedNode == null && sequence.hasNext()) {
                MemcachedNode next = sequence.next();
                if (next.isActive()) {
                    memcachedNode = next;
                }
            }
            if (memcachedNode == null) {
                memcachedNode = primary;
            }
        }
        if (!$assertionsDisabled && !operation.isCancelled() && memcachedNode == null) {
            throw new AssertionError("No node found for key " + str);
        }
        if (memcachedNode != null) {
            addOperation(memcachedNode, operation);
        } else if (!$assertionsDisabled && !operation.isCancelled()) {
            throw new AssertionError("No not found for " + str + " (and not immediately cancelled)");
        }
    }

    public void insertOperation(MemcachedNode memcachedNode, Operation operation) {
        operation.setHandlingNode(memcachedNode);
        operation.initialize();
        memcachedNode.insertOp(operation);
        this.addedQueue.offer(memcachedNode);
        Selector wakeup = this.selector.wakeup();
        if (!$assertionsDisabled && wakeup != this.selector) {
            throw new AssertionError("Wakeup returned the wrong selector.");
        }
        getLogger().debug("Added %s to %s", operation, memcachedNode);
    }

    public void addOperation(MemcachedNode memcachedNode, Operation operation) {
        operation.setHandlingNode(memcachedNode);
        operation.initialize();
        memcachedNode.addOp(operation);
        this.addedQueue.offer(memcachedNode);
        Selector wakeup = this.selector.wakeup();
        if (!$assertionsDisabled && wakeup != this.selector) {
            throw new AssertionError("Wakeup returned the wrong selector.");
        }
        getLogger().debug("Added %s to %s", operation, memcachedNode);
    }

    public void addOperations(Map<MemcachedNode, Operation> map) {
        for (Map.Entry<MemcachedNode, Operation> entry : map.entrySet()) {
            MemcachedNode key = entry.getKey();
            Operation value = entry.getValue();
            value.setHandlingNode(key);
            value.initialize();
            key.addOp(value);
            this.addedQueue.offer(key);
        }
        Selector wakeup = this.selector.wakeup();
        if (!$assertionsDisabled && wakeup != this.selector) {
            throw new AssertionError("Wakeup returned the wrong selector.");
        }
    }

    public CountDownLatch broadcastOperation(BroadcastOpFactory broadcastOpFactory) {
        return broadcastOperation(broadcastOpFactory, this.locator.getAll());
    }

    public CountDownLatch broadcastOperation(BroadcastOpFactory broadcastOpFactory, Collection<MemcachedNode> collection) {
        CountDownLatch countDownLatch = new CountDownLatch(this.locator.getAll().size());
        for (MemcachedNode memcachedNode : collection) {
            Operation newOp = broadcastOpFactory.newOp(memcachedNode, countDownLatch);
            newOp.initialize();
            memcachedNode.addOp(newOp);
            newOp.setHandlingNode(memcachedNode);
            this.addedQueue.offer(memcachedNode);
        }
        Selector wakeup = this.selector.wakeup();
        if ($assertionsDisabled || wakeup == this.selector) {
            return countDownLatch;
        }
        throw new AssertionError("Wakeup returned the wrong selector.");
    }

    public void shutdown() throws IOException {
        this.shutDown = true;
        Selector wakeup = this.selector.wakeup();
        if (!$assertionsDisabled && wakeup != this.selector) {
            throw new AssertionError("Wakeup returned the wrong selector.");
        }
        for (MemcachedNode memcachedNode : this.locator.getAll()) {
            if (memcachedNode.getChannel() != null) {
                memcachedNode.getChannel().close();
                memcachedNode.setSk(null);
                if (memcachedNode.getBytesRemainingToWrite() > 0) {
                    getLogger().warn("Shut down with %d bytes remaining to write", Integer.valueOf(memcachedNode.getBytesRemainingToWrite()));
                }
                getLogger().debug("Shut down channel %s", memcachedNode.getChannel());
            }
        }
        this.selector.close();
        getLogger().debug("Shut down selector %s", this.selector);
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("{MemcachedConnection to");
        for (MemcachedNode memcachedNode : this.locator.getAll()) {
            sb.append(" ");
            sb.append(memcachedNode.getSocketAddress());
        }
        sb.append("}");
        return sb.toString();
    }

    static {
        $assertionsDisabled = !MemcachedConnection.class.desiredAssertionStatus();
        continuousTimeout = new AtomicInteger(0);
    }
}
