/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client;

import com.couchbase.client.CouchbaseConnectionFactory;
import com.couchbase.client.http.HttpResponseCallback;
import com.couchbase.client.http.HttpUtil;
import com.couchbase.client.http.ViewPool;
import com.couchbase.client.protocol.views.HttpOperation;
import com.couchbase.client.vbucket.Reconfigurable;
import com.couchbase.client.vbucket.config.Bucket;
import com.couchbase.client.vbucket.config.DefaultConfig;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import net.spy.memcached.compat.SpyObject;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.config.ConnectionConfig;
import org.apache.http.impl.nio.DefaultHttpClientIODispatch;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.nio.NHttpClientEventHandler;
import org.apache.http.nio.protocol.BasicAsyncRequestProducer;
import org.apache.http.nio.protocol.BasicAsyncResponseConsumer;
import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncRequester;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.pool.ConnPool;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpCoreContext;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.HttpProcessorBuilder;
import org.apache.http.protocol.RequestConnControl;
import org.apache.http.protocol.RequestContent;
import org.apache.http.protocol.RequestExpectContinue;
import org.apache.http.protocol.RequestTargetHost;
import org.apache.http.protocol.RequestUserAgent;

public class ViewConnection
extends SpyObject
implements Reconfigurable {
    private static final String SCHEME = "http";
    private final List<HttpHost> viewNodes;
    private final String user;
    private final String password;
    private final ConnectingIOReactor ioReactor;
    private final ViewPool pool;
    private final HttpAsyncRequester requester;
    private volatile Thread reactorThread;
    private volatile int nextNode = 0;
    private volatile boolean running;

    public ViewConnection(CouchbaseConnectionFactory cf, List<InetSocketAddress> seedAddrs, String user, String password) throws IOException {
        this.user = user;
        this.password = password;
        this.viewNodes = Collections.synchronizedList(new ArrayList());
        for (InetSocketAddress addr : seedAddrs) {
            this.viewNodes.add(ViewConnection.createHttpHost(addr.getHostName(), addr.getPort()));
        }
        HttpProcessor httpProc = HttpProcessorBuilder.create().add((HttpRequestInterceptor)new RequestContent()).add((HttpRequestInterceptor)new RequestTargetHost()).add((HttpRequestInterceptor)new RequestConnControl()).add((HttpRequestInterceptor)new RequestUserAgent("JCBC/1.2")).add((HttpRequestInterceptor)new RequestExpectContinue(true)).build();
        this.requester = new HttpAsyncRequester(httpProc);
        this.ioReactor = new DefaultConnectingIOReactor(IOReactorConfig.custom().setConnectTimeout(5000).setSoTimeout(5000).setTcpNoDelay(true).setIoThreadCount(cf.getViewWorkerSize()).build());
        this.pool = new ViewPool(this.ioReactor, ConnectionConfig.DEFAULT);
        this.pool.setDefaultMaxPerRoute(cf.getViewConnsPerNode());
        this.updateMaxTotalRequests();
        this.initializeReactorThread();
    }

    public void addOp(HttpOperation op) {
        if (!this.running) {
            throw new IllegalStateException("Shutting down");
        }
        HttpCoreContext coreContext = HttpCoreContext.create();
        if (this.viewNodes.isEmpty()) {
            this.getLogger().error((Object)"No server connections. Cancelling op.");
            op.cancel();
        } else {
            if (!"default".equals(this.user)) {
                try {
                    op.addAuthHeader(HttpUtil.buildAuthHeader(this.user, this.password));
                }
                catch (UnsupportedEncodingException ex) {
                    this.getLogger().error((Object)("Could not create auth header for request, could not encode credentials into base64. Canceling op." + op), (Throwable)ex);
                    op.cancel();
                    return;
                }
            }
            HttpHost httpHost = this.getNextNode();
            HttpRequest request = op.getRequest();
            request.addHeader("Host", httpHost.toHostString());
            this.requester.execute((HttpAsyncRequestProducer)new BasicAsyncRequestProducer(httpHost, request), (HttpAsyncResponseConsumer)new BasicAsyncResponseConsumer(), (ConnPool)this.pool, (HttpContext)coreContext, (FutureCallback)new HttpResponseCallback(op, this, httpHost));
        }
    }

    public boolean shutdown() throws IOException {
        if (!this.running) {
            this.getLogger().info((Object)"Suppressing duplicate attempt to shut down");
            return false;
        }
        this.running = false;
        this.ioReactor.shutdown();
        try {
            this.reactorThread.join(0L);
        }
        catch (InterruptedException ex) {
            this.getLogger().error((Object)("Interrupt " + ex + " received while waiting for " + "view thread to shut down."));
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void reconfigure(Bucket bucket) {
        DefaultConfig config = (DefaultConfig)bucket.getConfig();
        int sizeBeforeReconfigure = this.viewNodes.size();
        ArrayList<HttpHost> currentViewServers = new ArrayList<HttpHost>();
        for (URL server : bucket.getConfig().getCouchServers()) {
            HttpHost host = ViewConnection.createHttpHost(server.getHost(), server.getPort());
            currentViewServers.add(host);
            if (this.viewNodes.contains(host) || !ViewConnection.hasActiveVBuckets(config, host)) continue;
            this.viewNodes.add(host);
        }
        List<HttpHost> list = this.viewNodes;
        synchronized (list) {
            Iterator<HttpHost> iter = this.viewNodes.iterator();
            while (iter.hasNext()) {
                HttpHost current = iter.next();
                if (currentViewServers.contains(current) && ViewConnection.hasActiveVBuckets(config, current)) continue;
                iter.remove();
                this.pool.closeConnectionsForHost(current);
            }
        }
        if (sizeBeforeReconfigure != this.viewNodes.size()) {
            this.updateMaxTotalRequests();
        }
    }

    private void initializeReactorThread() {
        DefaultHttpClientIODispatch ioEventDispatch = new DefaultHttpClientIODispatch((NHttpClientEventHandler)new HttpAsyncRequestExecutor(), ConnectionConfig.DEFAULT);
        this.reactorThread = new Thread(new Runnable((IOEventDispatch)ioEventDispatch){
            final /* synthetic */ IOEventDispatch val$ioEventDispatch;
            {
                this.val$ioEventDispatch = iOEventDispatch;
            }

            @Override
            public void run() {
                try {
                    ViewConnection.this.ioReactor.execute(this.val$ioEventDispatch);
                }
                catch (InterruptedIOException ex) {
                    ViewConnection.this.getLogger().error((Object)"I/O reactor Interrupted", (Throwable)ex);
                }
                catch (IOException e) {
                    ViewConnection.this.getLogger().error((Object)("I/O error: " + e.getMessage()), (Throwable)e);
                }
                ViewConnection.this.getLogger().info((Object)"I/O reactor terminated");
            }
        }, "Couchbase View Thread");
        this.reactorThread.start();
        this.running = true;
    }

    private void updateMaxTotalRequests() {
        int size = this.viewNodes.size();
        if (size > 0) {
            this.pool.setMaxTotal(this.viewNodes.size() * this.pool.getDefaultMaxPerRoute());
        } else {
            this.getLogger().warn((Object)"No View nodes are present, this could be a bug or no node has vBuckets attached.");
            this.pool.setMaxTotal(1);
        }
    }

    HttpHost getNextNode() {
        HttpHost host = null;
        while (host == null) {
            host = this.viewNodes.get(this.nextNode++ % this.viewNodes.size());
        }
        return host;
    }

    private static HttpHost createHttpHost(String host, int port) {
        return new HttpHost(host, port, SCHEME);
    }

    private static boolean hasActiveVBuckets(DefaultConfig config, HttpHost node) {
        return config.nodeHasActiveVBuckets(new InetSocketAddress(node.getHostName(), node.getPort()));
    }

    List<HttpHost> getConnectedHosts() {
        return this.viewNodes;
    }
}

