package net.spy.memcached.protocol;

import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import net.spy.memcached.MemcachedNode;
import net.spy.memcached.compat.SpyObject;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationState;

/* loaded from: input_file:net/spy/memcached/protocol/TCPMemcachedNodeImpl.class */
public abstract class TCPMemcachedNodeImpl extends SpyObject implements MemcachedNode {
    private final SocketAddress socketAddress;
    private final ByteBuffer rbuf;
    private final ByteBuffer wbuf;
    protected final BlockingQueue<Operation> writeQ;
    private final BlockingQueue<Operation> readQ;
    private final BlockingQueue<Operation> inputQueue;
    private SocketChannel channel;
    static final /* synthetic */ boolean $assertionsDisabled;
    private volatile int reconnectAttempt = 1;
    private int toWrite = 0;
    protected Operation optimizedOp = null;
    private volatile SelectionKey sk = null;

    public TCPMemcachedNodeImpl(SocketAddress socketAddress, SocketChannel socketChannel, int i, BlockingQueue<Operation> blockingQueue, BlockingQueue<Operation> blockingQueue2, BlockingQueue<Operation> blockingQueue3) {
        if (!$assertionsDisabled && socketAddress == null) {
            throw new AssertionError("No SocketAddress");
        }
        if (!$assertionsDisabled && socketChannel == null) {
            throw new AssertionError("No SocketChannel");
        }
        if (!$assertionsDisabled && i <= 0) {
            throw new AssertionError("Invalid buffer size: " + i);
        }
        if (!$assertionsDisabled && blockingQueue == null) {
            throw new AssertionError("No operation read queue");
        }
        if (!$assertionsDisabled && blockingQueue2 == null) {
            throw new AssertionError("No operation write queue");
        }
        if (!$assertionsDisabled && blockingQueue3 == null) {
            throw new AssertionError("No input queue");
        }
        this.socketAddress = socketAddress;
        setChannel(socketChannel);
        this.rbuf = ByteBuffer.allocate(i);
        this.wbuf = ByteBuffer.allocate(i);
        getWbuf().clear();
        this.readQ = blockingQueue;
        this.writeQ = blockingQueue2;
        this.inputQueue = blockingQueue3;
    }

    @Override // net.spy.memcached.MemcachedNode
    public final void copyInputQueue() {
        ArrayList arrayList = new ArrayList();
        this.inputQueue.drainTo(arrayList, this.writeQ.remainingCapacity());
        this.writeQ.addAll(arrayList);
    }

    @Override // net.spy.memcached.MemcachedNode
    public Collection<Operation> destroyInputQueue() {
        ArrayList arrayList = new ArrayList();
        this.inputQueue.drainTo(arrayList);
        return arrayList;
    }

    @Override // net.spy.memcached.MemcachedNode
    public final void setupResend() {
        Operation currentWriteOp = getCurrentWriteOp();
        if (currentWriteOp != null) {
            ByteBuffer buffer = currentWriteOp.getBuffer();
            if (buffer != null) {
                buffer.reset();
            } else {
                getLogger().info("No buffer for current write op, removing");
                removeCurrentWriteOp();
            }
        }
        while (hasReadOp()) {
            Operation removeCurrentReadOp = removeCurrentReadOp();
            if (removeCurrentReadOp != getCurrentWriteOp()) {
                getLogger().warn("Discarding partially completed op: %s", removeCurrentReadOp);
                removeCurrentReadOp.cancel();
            }
        }
        getWbuf().clear();
        getRbuf().clear();
        this.toWrite = 0;
    }

    private boolean preparePending() {
        Operation operation;
        copyInputQueue();
        Operation currentWriteOp = getCurrentWriteOp();
        while (true) {
            operation = currentWriteOp;
            if (operation == null || !operation.isCancelled()) {
                break;
            }
            getLogger().info("Removing cancelled operation: %s", operation);
            removeCurrentWriteOp();
            currentWriteOp = getCurrentWriteOp();
        }
        return operation != null;
    }

    @Override // net.spy.memcached.MemcachedNode
    public final void fillWriteBuffer(boolean z) {
        if (this.toWrite != 0 || this.readQ.remainingCapacity() <= 0) {
            getLogger().debug("Buffer is full, skipping");
            return;
        }
        getWbuf().clear();
        Operation currentWriteOp = getCurrentWriteOp();
        while (currentWriteOp != null && this.toWrite < getWbuf().capacity()) {
            if (!$assertionsDisabled && currentWriteOp.getState() != OperationState.WRITING) {
                throw new AssertionError();
            }
            if (!this.readQ.contains(currentWriteOp)) {
                this.readQ.add(currentWriteOp);
            }
            ByteBuffer buffer = currentWriteOp.getBuffer();
            if (!$assertionsDisabled && buffer == null) {
                throw new AssertionError("Didn't get a write buffer from " + currentWriteOp);
            }
            int min = Math.min(getWbuf().remaining(), buffer.remaining());
            byte[] bArr = new byte[min];
            buffer.get(bArr);
            getWbuf().put(bArr);
            getLogger().debug("After copying stuff from %s: %s", currentWriteOp, getWbuf());
            if (!currentWriteOp.getBuffer().hasRemaining()) {
                currentWriteOp.writeComplete();
                transitionWriteItem();
                preparePending();
                if (z) {
                    optimize();
                }
                currentWriteOp = getCurrentWriteOp();
            }
            this.toWrite += min;
        }
        getWbuf().flip();
        if (!$assertionsDisabled && this.toWrite > getWbuf().capacity()) {
            throw new AssertionError("toWrite exceeded capacity: " + this);
        }
        if (!$assertionsDisabled && this.toWrite != getWbuf().remaining()) {
            throw new AssertionError("Expected " + this.toWrite + " remaining, got " + getWbuf().remaining());
        }
    }

    @Override // net.spy.memcached.MemcachedNode
    public final void transitionWriteItem() {
        Operation removeCurrentWriteOp = removeCurrentWriteOp();
        if (!$assertionsDisabled && removeCurrentWriteOp == null) {
            throw new AssertionError("There is no write item to transition");
        }
        getLogger().debug("Finished writing %s", removeCurrentWriteOp);
    }

    protected abstract void optimize();

    @Override // net.spy.memcached.MemcachedNode
    public final Operation getCurrentReadOp() {
        return this.readQ.peek();
    }

    @Override // net.spy.memcached.MemcachedNode
    public final Operation removeCurrentReadOp() {
        return this.readQ.remove();
    }

    @Override // net.spy.memcached.MemcachedNode
    public final Operation getCurrentWriteOp() {
        return this.optimizedOp == null ? this.writeQ.peek() : this.optimizedOp;
    }

    @Override // net.spy.memcached.MemcachedNode
    public final Operation removeCurrentWriteOp() {
        Operation operation = this.optimizedOp;
        if (operation == null) {
            operation = this.writeQ.remove();
        } else {
            this.optimizedOp = null;
        }
        return operation;
    }

    @Override // net.spy.memcached.MemcachedNode
    public final boolean hasReadOp() {
        return !this.readQ.isEmpty();
    }

    @Override // net.spy.memcached.MemcachedNode
    public final boolean hasWriteOp() {
        return (this.optimizedOp == null && this.writeQ.isEmpty()) ? false : true;
    }

    @Override // net.spy.memcached.MemcachedNode
    public final void addOp(Operation operation) {
        boolean add = this.inputQueue.add(operation);
        if (!$assertionsDisabled && !add) {
            throw new AssertionError();
        }
    }

    @Override // net.spy.memcached.MemcachedNode
    public final int getSelectionOps() {
        int i = 0;
        if (getChannel().isConnected()) {
            if (hasReadOp()) {
                i = 0 | 1;
            }
            if (this.toWrite > 0 || hasWriteOp()) {
                i |= 4;
            }
        } else {
            i = 8;
        }
        return i;
    }

    @Override // net.spy.memcached.MemcachedNode
    public final ByteBuffer getRbuf() {
        return this.rbuf;
    }

    @Override // net.spy.memcached.MemcachedNode
    public final ByteBuffer getWbuf() {
        return this.wbuf;
    }

    @Override // net.spy.memcached.MemcachedNode
    public final SocketAddress getSocketAddress() {
        return this.socketAddress;
    }

    @Override // net.spy.memcached.MemcachedNode
    public final boolean isActive() {
        return this.reconnectAttempt == 0 && getChannel() != null && getChannel().isConnected();
    }

    @Override // net.spy.memcached.MemcachedNode
    public final void reconnecting() {
        this.reconnectAttempt++;
    }

    @Override // net.spy.memcached.MemcachedNode
    public final void connected() {
        this.reconnectAttempt = 0;
    }

    @Override // net.spy.memcached.MemcachedNode
    public final int getReconnectCount() {
        return this.reconnectAttempt;
    }

    public final String toString() {
        int i = 0;
        if (getSk() != null && getSk().isValid()) {
            i = getSk().interestOps();
        }
        return "{QA sa=" + getSocketAddress() + ", #Rops=" + (this.readQ.size() + (this.optimizedOp == null ? 0 : 1)) + ", #Wops=" + this.writeQ.size() + ", #iq=" + this.inputQueue.size() + ", topRop=" + getCurrentReadOp() + ", topWop=" + getCurrentWriteOp() + ", toWrite=" + this.toWrite + ", interested=" + i + "}";
    }

    @Override // net.spy.memcached.MemcachedNode
    public final void registerChannel(SocketChannel socketChannel, SelectionKey selectionKey) {
        setChannel(socketChannel);
        setSk(selectionKey);
    }

    @Override // net.spy.memcached.MemcachedNode
    public final void setChannel(SocketChannel socketChannel) {
        if (!$assertionsDisabled && this.channel != null && this.channel.isOpen()) {
            throw new AssertionError("Attempting to overwrite channel");
        }
        this.channel = socketChannel;
    }

    @Override // net.spy.memcached.MemcachedNode
    public final SocketChannel getChannel() {
        return this.channel;
    }

    @Override // net.spy.memcached.MemcachedNode
    public final void setSk(SelectionKey selectionKey) {
        this.sk = selectionKey;
    }

    @Override // net.spy.memcached.MemcachedNode
    public final SelectionKey getSk() {
        return this.sk;
    }

    @Override // net.spy.memcached.MemcachedNode
    public final int getBytesRemainingToWrite() {
        return this.toWrite;
    }

    @Override // net.spy.memcached.MemcachedNode
    public final int writeSome() throws IOException {
        int write = this.channel.write(this.wbuf);
        if (!$assertionsDisabled && write < 0) {
            throw new AssertionError("Wrote negative bytes?");
        }
        this.toWrite -= write;
        if (!$assertionsDisabled && this.toWrite < 0) {
            throw new AssertionError("toWrite went negative after writing " + write + " bytes for " + this);
        }
        getLogger().debug("Wrote %d bytes", Integer.valueOf(write));
        return write;
    }

    @Override // net.spy.memcached.MemcachedNode
    public final void fixupOps() {
        SelectionKey selectionKey = this.sk;
        if (selectionKey == null || !selectionKey.isValid()) {
            getLogger().debug("Selection key is not valid.");
            return;
        }
        int selectionOps = getSelectionOps();
        getLogger().debug("Setting interested opts to %d", Integer.valueOf(selectionOps));
        selectionKey.interestOps(selectionOps);
    }

    static {
        $assertionsDisabled = !TCPMemcachedNodeImpl.class.desiredAssertionStatus();
    }
}
