/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client;

import com.couchbase.client.CouchbaseConnectionFactory;
import com.couchbase.client.ViewNode;
import com.couchbase.client.http.AsyncConnectionManager;
import com.couchbase.client.http.RequeueOpCallback;
import com.couchbase.client.protocol.views.HttpOperation;
import com.couchbase.client.vbucket.Reconfigurable;
import com.couchbase.client.vbucket.config.Bucket;
import com.couchbase.client.vbucket.config.DefaultConfig;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import net.spy.memcached.AddrUtil;
import net.spy.memcached.ConnectionObserver;
import net.spy.memcached.compat.SpyObject;
import org.apache.http.ConnectionReuseStrategy;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.nio.NHttpClientHandler;
import org.apache.http.nio.protocol.AsyncNHttpClientHandler;
import org.apache.http.nio.protocol.EventListener;
import org.apache.http.nio.protocol.NHttpRequestExecutionHandler;
import org.apache.http.nio.util.ByteBufferAllocator;
import org.apache.http.nio.util.DirectByteBufferAllocator;
import org.apache.http.params.HttpParams;
import org.apache.http.params.SyncBasicHttpParams;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.ImmutableHttpProcessor;
import org.apache.http.protocol.RequestConnControl;
import org.apache.http.protocol.RequestContent;
import org.apache.http.protocol.RequestExpectContinue;
import org.apache.http.protocol.RequestTargetHost;
import org.apache.http.protocol.RequestUserAgent;

public class ViewConnection
extends SpyObject
implements Reconfigurable {
    private static final int NUM_CONNS = 1;
    private static final int MAX_ADDOP_RETRIES = 6;
    private volatile boolean shutDown = false;
    protected volatile boolean reconfiguring = false;
    protected volatile boolean running = true;
    private final ReentrantReadWriteLock rwlock = new ReentrantReadWriteLock();
    private final Lock rlock = this.rwlock.readLock();
    private final Lock wlock = this.rwlock.writeLock();
    private final CouchbaseConnectionFactory connFactory;
    private final Collection<ConnectionObserver> connObservers = new ConcurrentLinkedQueue<ConnectionObserver>();
    private List<ViewNode> couchNodes;
    private int nextNode;

    public ViewConnection(CouchbaseConnectionFactory cf, List<InetSocketAddress> addrs, Collection<ConnectionObserver> obs) throws IOException {
        this.connFactory = cf;
        this.connObservers.addAll(obs);
        this.couchNodes = this.createConnections(addrs);
        this.nextNode = 0;
    }

    private List<ViewNode> createConnections(List<InetSocketAddress> addrs) throws IOException {
        LinkedList<ViewNode> nodeList = new LinkedList<ViewNode>();
        for (InetSocketAddress a : addrs) {
            SyncBasicHttpParams params = new SyncBasicHttpParams();
            params.setIntParameter("http.socket.timeout", 5000).setIntParameter("http.connection.timeout", 5000).setIntParameter("http.socket.buffer-size", 8192).setBooleanParameter("http.connection.stalecheck", false).setBooleanParameter("http.tcp.nodelay", true).setParameter("http.useragent", (Object)"Couchbase Java Client 1.0.2");
            ImmutableHttpProcessor httpproc = new ImmutableHttpProcessor(new HttpRequestInterceptor[]{new RequestContent(), new RequestTargetHost(), new RequestConnControl(), new RequestUserAgent(), new RequestExpectContinue()});
            AsyncNHttpClientHandler protocolHandler = new AsyncNHttpClientHandler((HttpProcessor)httpproc, (NHttpRequestExecutionHandler)new ViewNode.MyHttpRequestExecutionHandler(this), (ConnectionReuseStrategy)new DefaultConnectionReuseStrategy(), (ByteBufferAllocator)new DirectByteBufferAllocator(), (HttpParams)params);
            protocolHandler.setEventListener((EventListener)new ViewNode.EventLogger());
            AsyncConnectionManager connMgr = new AsyncConnectionManager(new HttpHost(a.getHostName(), a.getPort()), 1, (NHttpClientHandler)protocolHandler, (HttpParams)params, new RequeueOpCallback(this));
            this.getLogger().info("Added %s to connect queue", new Object[]{a.getHostName()});
            ViewNode node = this.connFactory.createViewNode(a, connMgr);
            node.init();
            nodeList.add(node);
        }
        return nodeList;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addOp(HttpOperation op) {
        block8: {
            this.rlock.lock();
            try {
                if (this.couchNodes.isEmpty()) {
                    this.getLogger().error((Object)"No server connections. Cancelling op.");
                    op.cancel();
                    break block8;
                }
                boolean success = false;
                int retries = 0;
                do {
                    if (retries >= 6) {
                        op.cancel();
                        break;
                    }
                    ViewNode node = this.couchNodes.get(this.getNextNode());
                    if (node.isShuttingDown() || !this.hasActiveVBuckets(node)) continue;
                    if (retries > 0) {
                        this.getLogger().debug((Object)("Retrying view operation #" + op.hashCode() + " on node: " + node.getSocketAddress()));
                    }
                    success = node.writeOp(op);
                    if (retries > 0 && success) {
                        this.getLogger().debug((Object)("Successfully wrote #" + op.hashCode() + " on node: " + node.getSocketAddress() + " after " + retries + " retries."));
                    }
                    ++retries;
                } while (!success);
            }
            finally {
                this.rlock.unlock();
            }
        }
    }

    private int getNextNode() {
        ++this.nextNode;
        return this.nextNode %= this.couchNodes.size();
    }

    private boolean hasActiveVBuckets(ViewNode node) {
        DefaultConfig config = (DefaultConfig)this.connFactory.getVBucketConfig();
        return config.nodeHasActiveVBuckets(node.getSocketAddress());
    }

    public List<ViewNode> getConnectedNodes() {
        return this.couchNodes;
    }

    protected void checkState() {
        if (this.shutDown) {
            throw new IllegalStateException("Shutting down");
        }
    }

    public boolean shutdown() throws IOException {
        if (this.shutDown) {
            this.getLogger().info((Object)"Suppressing duplicate attempt to shut down");
            return false;
        }
        this.shutDown = true;
        this.running = false;
        ArrayList<ViewNode> nodesToRemove = new ArrayList<ViewNode>();
        for (ViewNode node : this.couchNodes) {
            if (node == null) continue;
            String hostname = node.getSocketAddress().getHostName();
            if (node.hasWriteOps()) {
                this.getLogger().warn((Object)("Shutting down " + hostname + " with ops waiting to be written"));
            } else {
                this.getLogger().info((Object)("Node " + hostname + " has no ops in the queue"));
            }
            node.shutdown();
            nodesToRemove.add(node);
        }
        for (ViewNode node : nodesToRemove) {
            this.couchNodes.remove((Object)node);
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void reconfigure(Bucket bucket) {
        this.reconfiguring = true;
        try {
            HashSet<InetSocketAddress> newServerAddresses = new HashSet<InetSocketAddress>();
            List newServers = AddrUtil.getAddressesFromURL(bucket.getConfig().getCouchServers());
            for (InetSocketAddress server : newServers) {
                newServerAddresses.add(server);
            }
            ArrayList<ViewNode> shutdownNodes = new ArrayList<ViewNode>();
            ArrayList<ViewNode> stayNodes = new ArrayList<ViewNode>();
            ArrayList<InetSocketAddress> stayServers = new ArrayList<InetSocketAddress>();
            this.wlock.lock();
            try {
                for (ViewNode current : this.couchNodes) {
                    if (newServerAddresses.contains(current.getSocketAddress())) {
                        stayNodes.add(current);
                        stayServers.add(current.getSocketAddress());
                        continue;
                    }
                    shutdownNodes.add(current);
                }
                newServers.removeAll(stayServers);
                List<ViewNode> newNodes = this.createConnections(newServers);
                ArrayList<ViewNode> mergedNodes = new ArrayList<ViewNode>();
                mergedNodes.addAll(stayNodes);
                mergedNodes.addAll(newNodes);
                this.couchNodes = mergedNodes;
            }
            finally {
                this.wlock.unlock();
            }
            for (ViewNode qa : shutdownNodes) {
                try {
                    qa.shutdown();
                }
                catch (IOException e) {
                    this.getLogger().error((Object)("Error shutting down connection to " + qa.getSocketAddress()));
                }
            }
        }
        catch (IOException e) {
            this.getLogger().error((Object)"Connection reconfiguration failed", (Throwable)e);
        }
        finally {
            this.reconfiguring = false;
        }
    }
}

