/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.vbucket;

import com.couchbase.client.http.HttpUtil;
import com.couchbase.client.vbucket.BucketMonitorPipelineFactory;
import com.couchbase.client.vbucket.BucketUpdateResponseHandler;
import com.couchbase.client.vbucket.ConfigurationProviderHTTP;
import com.couchbase.client.vbucket.ConnectionException;
import com.couchbase.client.vbucket.config.Bucket;
import com.couchbase.client.vbucket.config.ConfigurationParser;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.text.ParseException;
import java.util.Observable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import net.spy.memcached.compat.log.Logger;
import net.spy.memcached.compat.log.LoggerFactory;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpVersion;

public class BucketMonitor
extends Observable {
    private final URI cometStreamURI;
    private final String httpUser;
    private final String httpPass;
    private final ChannelFactory factory;
    private volatile Channel channel;
    private final String host;
    private final int port;
    private ConfigurationParser configParser;
    private BucketUpdateResponseHandler handler;
    private final HttpMessageHeaders headers;
    private static final Logger LOGGER = LoggerFactory.getLogger((String)BucketMonitor.class.getName());
    private ClientBootstrap bootstrap;
    private final ConfigurationProviderHTTP provider;

    public BucketMonitor(URI cometStreamURI, String username, String password, ConfigurationParser configParser, ConfigurationProviderHTTP provider) {
        String scheme;
        if (cometStreamURI == null) {
            throw new IllegalArgumentException("cometStreamURI cannot be NULL");
        }
        String string = scheme = cometStreamURI.getScheme() == null ? "http" : cometStreamURI.getScheme();
        if (!scheme.equals("http")) {
            throw new UnsupportedOperationException("Only http is supported.");
        }
        this.cometStreamURI = cometStreamURI;
        this.httpUser = username;
        this.httpPass = password;
        this.configParser = configParser;
        this.host = cometStreamURI.getHost();
        this.port = cometStreamURI.getPort() == -1 ? 80 : cometStreamURI.getPort();
        this.factory = new NioClientSocketChannelFactory((Executor)Executors.newCachedThreadPool(), (Executor)Executors.newCachedThreadPool());
        this.headers = new HttpMessageHeaders();
        this.provider = provider;
    }

    protected void notifyDisconnected() {
        Bucket bucket = this.provider.getBucketConfiguration(this.provider.getBucket());
        bucket.setIsNotUpdating();
        LOGGER.trace((Object)("Marked bucket " + bucket.getName() + " as not updating.  Notifying observers."));
        LOGGER.trace((Object)("There appear to be " + this.countObservers() + " observers waiting for notification"));
        this.setChanged();
        this.notifyObservers();
    }

    public void startMonitor() {
        if (this.channel != null) {
            LOGGER.info((Object)"Bucket monitor is already started.");
            return;
        }
        ChannelFuture channelFuture = this.createChannel();
        final CountDownLatch channelLatch = new CountDownLatch(1);
        channelFuture.addListener(new ChannelFutureListener(){

            public void operationComplete(ChannelFuture cf) throws Exception {
                if (cf.isSuccess()) {
                    BucketMonitor.this.channel = cf.getChannel();
                } else {
                    LOGGER.warn((Object)"Could not start monitor channel because of: ", cf.getCause());
                }
                channelLatch.countDown();
            }
        });
        try {
            channelLatch.await();
        }
        catch (InterruptedException ex) {
            throw new ConnectionException("Interrupted while waiting for streaming connection to arrive.");
        }
        if (this.channel == null) {
            this.bootstrap.releaseExternalResources();
            throw new ConnectionException("Could not establish a streaming connection to " + this.host + ":" + this.port);
        }
        this.handler = (BucketUpdateResponseHandler)this.channel.getPipeline().get(BucketUpdateResponseHandler.class);
        this.handler.setBucketMonitor(this);
        HttpRequest request = this.prepareRequest(this.cometStreamURI, this.host);
        this.channel.write((Object)request);
        try {
            String response = this.handler.getLastResponse();
            LOGGER.debug((Object)("Getting server list returns this last chunked response:\n" + response));
            Bucket bucketToMonitor = this.configParser.parseBucket(response);
            this.setChanged();
            this.notifyObservers(bucketToMonitor);
        }
        catch (ParseException ex) {
            LOGGER.warn((Object)"Invalid client configuration received from server. Staying with existing configuration.", (Throwable)ex);
            LOGGER.debug("Invalid client configuration received:\n", new Object[]{this.handler.getLastResponse()});
        }
    }

    protected ChannelFuture createChannel() {
        this.bootstrap = new ClientBootstrap(this.factory);
        this.bootstrap.setPipelineFactory((ChannelPipelineFactory)new BucketMonitorPipelineFactory());
        return this.bootstrap.connect((SocketAddress)new InetSocketAddress(this.host, this.port));
    }

    protected HttpRequest prepareRequest(URI uri, String h) {
        DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toASCIIString());
        this.headers.setHeader((HttpRequest)request, "Host", h);
        if (this.getHttpUser() != null && !this.getHttpUser().isEmpty()) {
            try {
                String basicAuthHeader = HttpUtil.buildAuthHeader(this.getHttpUser(), this.getHttpPass());
                this.headers.setHeader((HttpRequest)request, "Authorization", basicAuthHeader);
            }
            catch (UnsupportedEncodingException ex) {
                throw new RuntimeException("Could not encode specified credentials for HTTP request.", ex);
            }
        }
        this.headers.setHeader((HttpRequest)request, "Connection", "close");
        this.headers.setHeader((HttpRequest)request, "Cache-Control", "no-cache");
        this.headers.setHeader((HttpRequest)request, "Accept", "application/json");
        this.headers.setHeader((HttpRequest)request, "User-Agent", "Couchbase Java Client");
        return request;
    }

    public String getHttpUser() {
        return this.httpUser;
    }

    public String getHttpPass() {
        return this.httpPass;
    }

    public void shutdown() {
        this.shutdown(-1L, TimeUnit.MILLISECONDS);
    }

    public void shutdown(long timeout, TimeUnit unit) {
        this.deleteObservers();
        if (this.channel != null) {
            this.channel.close().awaitUninterruptibly(timeout, unit);
        }
        this.factory.releaseExternalResources();
    }

    protected void replaceConfig() {
        try {
            String response = this.handler.getLastResponse();
            Bucket updatedBucket = this.configParser.updateBucket(response, this.provider.getBucketConfiguration(this.provider.getBucket()));
            this.setChanged();
            this.notifyObservers(updatedBucket);
        }
        catch (ParseException e) {
            LOGGER.warn((Object)"Invalid client configuration received from server. Staying with existing configuration.", (Throwable)e);
        }
    }

    public void setConfigParser(ConfigurationParser newConfigParser) {
        this.configParser = newConfigParser;
    }

    private static final class HttpMessageHeaders {
        private final Method m;

        private HttpMessageHeaders() {
            this(HttpMessageHeaders.getHttpMessageHeaderStrategy());
        }

        private HttpMessageHeaders(Method m) {
            this.m = m;
        }

        private static Method getHttpMessageHeaderStrategy() {
            try {
                return HttpRequest.class.getMethod("setHeader", String.class, Object.class);
            }
            catch (SecurityException e) {
                throw new RuntimeException("Cannot check method due to security restrictions.", e);
            }
            catch (NoSuchMethodException e) {
                try {
                    return HttpRequest.class.getMethod("setHeader", String.class, String.class);
                }
                catch (Exception e1) {
                    throw new RuntimeException("No suitable setHeader method found on netty HttpRequest, the signature seems to have changed.", e1);
                }
            }
        }

        void setHeader(HttpRequest obj, String name, String value) {
            try {
                this.m.invoke((Object)obj, name, value);
            }
            catch (Exception e) {
                throw new RuntimeException("Could not invoke method " + this.m + " with args '" + name + "' and '" + value + "'.", e);
            }
        }
    }
}

