/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.driver.core;

import com.datastax.driver.core.AbstractReconnectionHandler;
import com.datastax.driver.core.AuthProvider;
import com.datastax.driver.core.BusyConnectionException;
import com.datastax.driver.core.Configuration;
import com.datastax.driver.core.Connection;
import com.datastax.driver.core.ConnectionException;
import com.datastax.driver.core.ControlConnection;
import com.datastax.driver.core.ConvictionPolicy;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.HostConnectionPool;
import com.datastax.driver.core.LatencyTracker;
import com.datastax.driver.core.MD5Digest;
import com.datastax.driver.core.Message;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.Metrics;
import com.datastax.driver.core.MetricsOptions;
import com.datastax.driver.core.PlainTextAuthProvider;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ProtocolEvent;
import com.datastax.driver.core.ProtocolOptions;
import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.Requests;
import com.datastax.driver.core.Responses;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.SSLOptions;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.ShutdownFuture;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.exceptions.AuthenticationException;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.datastax.driver.core.policies.Policies;
import com.datastax.driver.core.policies.ReconnectionPolicy;
import com.datastax.driver.core.policies.RetryPolicy;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.MapMaker;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Cluster {
    private static final Logger logger = LoggerFactory.getLogger(Cluster.class);
    private static final AtomicInteger CLUSTER_ID = new AtomicInteger(0);
    private static final int DEFAULT_THREAD_KEEP_ALIVE = 30;
    final Manager manager;

    private Cluster(String name, List<InetAddress> contactPoints, Configuration configuration, Collection<Host.StateListener> listeners) {
        this.manager = new Manager(name, contactPoints, configuration, listeners);
    }

    public Cluster init() {
        this.manager.init();
        return this;
    }

    public static Cluster buildFrom(Initializer initializer) {
        List<InetAddress> contactPoints = initializer.getContactPoints();
        if (contactPoints.isEmpty()) {
            throw new IllegalArgumentException("Cannot build a cluster without contact points");
        }
        return new Cluster(initializer.getClusterName(), contactPoints, initializer.getConfiguration(), initializer.getInitialListeners());
    }

    public static Builder builder() {
        return new Builder();
    }

    public Session connect() {
        return this.manager.newSession();
    }

    public Session connect(String keyspace) {
        Session session = this.connect();
        session.manager.setKeyspace(keyspace);
        return session;
    }

    public String getClusterName() {
        return this.manager.clusterName;
    }

    public Metadata getMetadata() {
        this.manager.init();
        return this.manager.metadata;
    }

    public Configuration getConfiguration() {
        return this.manager.configuration;
    }

    public Metrics getMetrics() {
        return this.manager.metrics;
    }

    public Cluster register(Host.StateListener listener) {
        this.manager.listeners.add(listener);
        return this;
    }

    public Cluster unregister(Host.StateListener listener) {
        this.manager.listeners.remove(listener);
        return this;
    }

    public Cluster register(LatencyTracker tracker) {
        this.manager.trackers.add(tracker);
        return this;
    }

    public Cluster unregister(LatencyTracker tracker) {
        this.manager.trackers.remove(tracker);
        return this;
    }

    public ShutdownFuture shutdown() {
        return this.manager.shutdown();
    }

    private static ThreadFactory threadFactory(String nameFormat) {
        return new ThreadFactoryBuilder().setNameFormat(nameFormat).build();
    }

    static long timeSince(long startNanos, TimeUnit destUnit) {
        return destUnit.convert(System.nanoTime() - startNanos, TimeUnit.NANOSECONDS);
    }

    private static String generateClusterName() {
        return "cluster" + CLUSTER_ID.incrementAndGet();
    }

    private static ListeningExecutorService makeExecutor(int threads, String name) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(threads, threads, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), Cluster.threadFactory(name));
        executor.allowCoreThreadTimeOut(true);
        return MoreExecutors.listeningDecorator((ExecutorService)executor);
    }

    static /* synthetic */ ThreadFactory access$400(String x0) {
        return Cluster.threadFactory(x0);
    }

    class Manager
    implements Host.StateListener,
    Connection.DefaultResponseHandler {
        final String clusterName;
        private final AtomicBoolean isInit = new AtomicBoolean(false);
        final List<InetAddress> contactPoints;
        final Set<Session> sessions = new CopyOnWriteArraySet<Session>();
        final Metadata metadata;
        final Configuration configuration;
        final Metrics metrics;
        final Connection.Factory connectionFactory;
        final ControlConnection controlConnection;
        final ConvictionPolicy.Factory convictionPolicyFactory = new ConvictionPolicy.Simple.Factory();
        final ScheduledExecutorService reconnectionExecutor = Executors.newScheduledThreadPool(2, Cluster.access$400("Reconnection-%d"));
        final ScheduledExecutorService scheduledTasksExecutor = Executors.newScheduledThreadPool(1, Cluster.access$400("Scheduled Tasks-%d"));
        final ListeningExecutorService executor;
        final ListeningExecutorService blockingTasksExecutor;
        final AtomicReference<ShutdownFuture> shutdownFuture = new AtomicReference();
        final Map<MD5Digest, PreparedStatement> preparedQueries = new MapMaker().weakKeys().weakValues().makeMap();
        final Set<Host.StateListener> listeners;
        final Set<LatencyTracker> trackers = new CopyOnWriteArraySet<LatencyTracker>();

        private Manager(String clusterName, List<InetAddress> contactPoints, Configuration configuration, Collection<Host.StateListener> listeners) {
            logger.debug("Starting new cluster with contact points " + contactPoints);
            this.clusterName = clusterName == null ? Cluster.generateClusterName() : clusterName;
            this.executor = Cluster.makeExecutor(Runtime.getRuntime().availableProcessors(), "Cassandra Java Driver worker-%d");
            this.blockingTasksExecutor = Cluster.makeExecutor(2, "Cassandra Java Driver blocking tasks worker-%d");
            this.configuration = configuration;
            this.metadata = new Metadata(this);
            this.contactPoints = contactPoints;
            this.connectionFactory = new Connection.Factory(this, configuration.getProtocolOptions().getAuthProvider());
            this.controlConnection = new ControlConnection(this);
            this.metrics = configuration.getMetricsOptions() == null ? null : new Metrics(this);
            this.configuration.register(this);
            this.listeners = new CopyOnWriteArraySet<Host.StateListener>(listeners);
        }

        private void init() {
            if (!this.isInit.compareAndSet(false, true)) {
                return;
            }
            for (InetAddress address : this.contactPoints) {
                Host host = this.addHost(address, false);
                if (host == null) continue;
                host.setUp();
                for (Host.StateListener listener : this.listeners) {
                    listener.onAdd(host);
                }
            }
            this.loadBalancingPolicy().init(Cluster.this, this.metadata.allHosts());
            try {
                this.controlConnection.connect();
            }
            catch (NoHostAvailableException e) {
                this.shutdown();
                throw e;
            }
        }

        Cluster getCluster() {
            return Cluster.this;
        }

        LoadBalancingPolicy loadBalancingPolicy() {
            return this.configuration.getPolicies().getLoadBalancingPolicy();
        }

        ReconnectionPolicy reconnectionPolicy() {
            return this.configuration.getPolicies().getReconnectionPolicy();
        }

        private Session newSession() {
            this.init();
            Session session = new Session(Cluster.this, this.metadata.allHosts());
            this.sessions.add(session);
            return session;
        }

        void reportLatency(Host host, long latencyNanos) {
            for (LatencyTracker tracker : this.trackers) {
                tracker.update(host, latencyNanos);
            }
        }

        boolean isShutdown() {
            return this.shutdownFuture.get() != null;
        }

        private ShutdownFuture shutdown() {
            ShutdownFuture future = this.shutdownFuture.get();
            if (future != null) {
                return future;
            }
            logger.debug("Shutting down");
            this.reconnectionExecutor.shutdown();
            this.scheduledTasksExecutor.shutdown();
            this.executor.shutdown();
            if (this.metrics != null) {
                this.metrics.shutdown();
            }
            ArrayList<ShutdownFuture> futures = new ArrayList<ShutdownFuture>(this.sessions.size() + 1);
            futures.add(this.controlConnection.shutdown());
            for (Session session : this.sessions) {
                futures.add(session.shutdown());
            }
            future = new ClusterShutdownFuture(futures);
            return this.shutdownFuture.compareAndSet(null, future) ? future : this.shutdownFuture.get();
        }

        @Override
        public void onUp(final Host host) {
            logger.trace("Host {} is UP", (Object)host);
            if (this.isShutdown()) {
                return;
            }
            if (host.isUp()) {
                return;
            }
            ScheduledFuture scheduledAttempt = host.reconnectionAttempt.getAndSet(null);
            if (scheduledAttempt != null) {
                logger.debug("Cancelling reconnection attempt since node is UP");
                scheduledAttempt.cancel(false);
            }
            try {
                this.prepareAllQueries(host);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            for (Session s : this.sessions) {
                s.manager.removePool(host);
            }
            this.loadBalancingPolicy().onUp(host);
            this.controlConnection.onUp(host);
            ArrayList<ListenableFuture<Boolean>> futures = new ArrayList<ListenableFuture<Boolean>>(this.sessions.size());
            for (Session s : this.sessions) {
                futures.add(s.manager.addOrRenewPool(host, false));
            }
            Futures.addCallback((ListenableFuture)Futures.allAsList(futures), (FutureCallback)new FutureCallback<List<Boolean>>(){

                public void onSuccess(List<Boolean> poolCreationResults) {
                    if (Iterables.any(poolCreationResults, (Predicate)Predicates.equalTo((Object)false))) {
                        logger.debug("Connection pool cannot be created, not marking {} UP", (Object)host);
                        return;
                    }
                    host.setUp();
                    for (Host.StateListener listener : Manager.this.listeners) {
                        listener.onUp(host);
                    }
                    for (Session s : Manager.this.sessions) {
                        s.manager.updateCreatedPools();
                    }
                }

                public void onFailure(Throwable t) {
                    if (!(t instanceof InterruptedException)) {
                        logger.error("Unexpected error while marking node UP: while this shouldn't happen, this shouldn't be critical", t);
                    }
                }
            });
        }

        @Override
        public void onDown(Host host) {
            this.onDown(host, false);
        }

        public void onDown(final Host host, final boolean isHostAddition) {
            logger.trace("Host {} is DOWN", (Object)host);
            if (this.isShutdown()) {
                return;
            }
            if (host.reconnectionAttempt.get() != null) {
                return;
            }
            boolean wasUp = host.isUp();
            host.setDown();
            this.loadBalancingPolicy().onDown(host);
            this.controlConnection.onDown(host);
            for (Session s : this.sessions) {
                s.manager.onDown(host);
            }
            if (wasUp) {
                for (Host.StateListener listener : this.listeners) {
                    listener.onDown(host);
                }
            }
            logger.debug("{} is down, scheduling connection retries", (Object)host);
            new AbstractReconnectionHandler(this.reconnectionExecutor, this.reconnectionPolicy().newSchedule(), host.reconnectionAttempt){

                @Override
                protected Connection tryReconnect() throws ConnectionException, InterruptedException {
                    return Manager.this.connectionFactory.open(host);
                }

                @Override
                protected void onReconnection(Connection connection) {
                    logger.debug("Successful reconnection to {}, setting host UP", (Object)host);
                    if (isHostAddition) {
                        Manager.this.onAdd(host);
                    } else {
                        Manager.this.onUp(host);
                    }
                }

                @Override
                protected boolean onConnectionException(ConnectionException e, long nextDelayMs) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Failed reconnection to {} ({}), scheduling retry in {} milliseconds", new Object[]{host, e.getMessage(), nextDelayMs});
                    }
                    return true;
                }

                @Override
                protected boolean onUnknownException(Exception e, long nextDelayMs) {
                    logger.error(String.format("Unknown error during control connection reconnection, scheduling retry in %d milliseconds", nextDelayMs), (Throwable)e);
                    return true;
                }
            }.start();
        }

        @Override
        public void onAdd(final Host host) {
            logger.trace("Adding new host {}", (Object)host);
            if (this.isShutdown()) {
                return;
            }
            try {
                this.prepareAllQueries(host);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            this.loadBalancingPolicy().onAdd(host);
            this.controlConnection.onAdd(host);
            ArrayList<ListenableFuture<Boolean>> futures = new ArrayList<ListenableFuture<Boolean>>(this.sessions.size());
            for (Session s : this.sessions) {
                futures.add(s.manager.addOrRenewPool(host, true));
            }
            Futures.addCallback((ListenableFuture)Futures.allAsList(futures), (FutureCallback)new FutureCallback<List<Boolean>>(){

                public void onSuccess(List<Boolean> poolCreationResults) {
                    if (Iterables.any(poolCreationResults, (Predicate)Predicates.equalTo((Object)false))) {
                        logger.debug("Connection pool cannot be created, not marking {} UP", (Object)host);
                        return;
                    }
                    host.setUp();
                    for (Host.StateListener listener : Manager.this.listeners) {
                        listener.onAdd(host);
                    }
                    for (Session s : Manager.this.sessions) {
                        s.manager.updateCreatedPools();
                    }
                }

                public void onFailure(Throwable t) {
                    if (!(t instanceof InterruptedException)) {
                        logger.error("Unexpected error while adding node: while this shouldn't happen, this shouldn't be critical", t);
                    }
                }
            });
        }

        @Override
        public void onRemove(Host host) {
            if (this.isShutdown()) {
                return;
            }
            host.setDown();
            logger.trace("Removing host {}", (Object)host);
            this.loadBalancingPolicy().onRemove(host);
            this.controlConnection.onRemove(host);
            for (Session s : this.sessions) {
                s.manager.onRemove(host);
            }
            for (Host.StateListener listener : this.listeners) {
                listener.onRemove(host);
            }
        }

        public boolean signalConnectionFailure(Host host, ConnectionException exception, boolean isHostAddition) {
            boolean isDown = host.signalConnectionFailure(exception);
            if (isDown) {
                this.onDown(host, isHostAddition);
            }
            return isDown;
        }

        public Host addHost(InetAddress address, boolean signal) {
            Host newHost = this.metadata.add(address);
            if (newHost != null && signal) {
                logger.info("New Cassandra host {} added", (Object)newHost);
                this.onAdd(newHost);
            }
            return newHost;
        }

        public void removeHost(Host host) {
            if (host == null) {
                return;
            }
            if (this.metadata.remove(host)) {
                logger.info("Cassandra host {} removed", (Object)host);
                this.onRemove(host);
            }
        }

        public void ensurePoolsSizing() {
            for (Session session : this.sessions) {
                for (HostConnectionPool pool : session.manager.pools.values()) {
                    pool.ensureCoreConnections();
                }
            }
        }

        public void prepare(MD5Digest digest, PreparedStatement stmt, InetAddress toExclude) throws InterruptedException {
            this.preparedQueries.put(digest, stmt);
            for (Session s : this.sessions) {
                s.manager.prepare(stmt.getQueryString(), toExclude);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void prepareAllQueries(Host host) throws InterruptedException {
            if (this.preparedQueries.isEmpty()) {
                return;
            }
            logger.debug("Preparing {} prepared queries on newly up node {}", (Object)this.preparedQueries.size(), (Object)host);
            try {
                Connection connection = this.connectionFactory.open(host);
                try {
                    try {
                        ControlConnection.waitForSchemaAgreement(connection, this.metadata);
                    }
                    catch (ExecutionException e) {
                        // empty catch block
                    }
                    HashMultimap perKeyspace = HashMultimap.create();
                    for (PreparedStatement ps : this.preparedQueries.values()) {
                        String keyspace = ps.getQueryKeyspace() == null ? "" : ps.getQueryKeyspace();
                        perKeyspace.put((Object)keyspace, (Object)ps.getQueryString());
                    }
                    for (String keyspace : perKeyspace.keySet()) {
                        if (!keyspace.isEmpty()) {
                            connection.setKeyspace(keyspace);
                        }
                        ArrayList<Connection.Future> futures = new ArrayList<Connection.Future>(this.preparedQueries.size());
                        for (String query : perKeyspace.get((Object)keyspace)) {
                            futures.add(connection.write(new Requests.Prepare(query)));
                        }
                        for (Connection.Future future : futures) {
                            try {
                                future.get();
                            }
                            catch (ExecutionException e) {
                                logger.debug("Unexpected error while preparing queries on new/newly up host", (Throwable)e);
                            }
                        }
                    }
                }
                finally {
                    connection.close();
                }
            }
            catch (ConnectionException e) {
            }
            catch (AuthenticationException e) {
            }
            catch (BusyConnectionException busyConnectionException) {
                // empty catch block
            }
        }

        public void submitSchemaRefresh(final String keyspace, final String table) {
            logger.trace("Submitting schema refresh");
            this.executor.submit(new Runnable(){

                @Override
                public void run() {
                    try {
                        Manager.this.controlConnection.refreshSchema(keyspace, table);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            });
        }

        public void refreshSchema(final Connection connection, final ResultSetFuture future, final ResultSet rs, final String keyspace, final String table) {
            if (logger.isDebugEnabled()) {
                logger.debug("Refreshing schema for {}{}", (Object)(keyspace == null ? "" : keyspace), (Object)(table == null ? "" : "." + table));
            }
            this.executor.submit(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    try {
                        if (!ControlConnection.waitForSchemaAgreement(connection, Manager.this.metadata)) {
                            logger.warn("No schema agreement from live replicas after {} ms. The schema may not be up to date on some nodes.", (Object)10000L);
                        }
                        ControlConnection.refreshSchema(connection, keyspace, table, Manager.this);
                    }
                    catch (Exception e) {
                        logger.error("Error during schema refresh ({}). The schema from Cluster.getMetadata() might appear stale. Asynchronously submitting job to fix.", (Object)e.getMessage());
                        Manager.this.submitSchemaRefresh(keyspace, table);
                    }
                    finally {
                        future.setResult(rs);
                    }
                }
            });
        }

        @Override
        public void handle(Message.Response response) {
            if (!(response instanceof Responses.Event)) {
                logger.error("Received an unexpected message from the server: {}", (Object)response);
                return;
            }
            final ProtocolEvent event = ((Responses.Event)response).event;
            logger.debug("Received event {}, scheduling delivery", (Object)response);
            this.scheduledTasksExecutor.schedule(new Runnable(){

                @Override
                public void run() {
                    block0 : switch (event.type) {
                        case TOPOLOGY_CHANGE: {
                            ProtocolEvent.TopologyChange tpc = (ProtocolEvent.TopologyChange)event;
                            switch (tpc.change) {
                                case NEW_NODE: {
                                    Manager.this.addHost(tpc.node.getAddress(), true);
                                    break;
                                }
                                case REMOVED_NODE: {
                                    Manager.this.removeHost(Manager.this.metadata.getHost(tpc.node.getAddress()));
                                    break;
                                }
                                case MOVED_NODE: {
                                    Manager.this.controlConnection.refreshNodeListAndTokenMap();
                                }
                            }
                            break;
                        }
                        case STATUS_CHANGE: {
                            ProtocolEvent.StatusChange stc = (ProtocolEvent.StatusChange)event;
                            switch (stc.status) {
                                case UP: {
                                    Host hostUp = Manager.this.metadata.getHost(stc.node.getAddress());
                                    if (hostUp == null) {
                                        Manager.this.addHost(stc.node.getAddress(), true);
                                        break;
                                    }
                                    Manager.this.onUp(hostUp);
                                    break;
                                }
                                case DOWN: {
                                    Host hostDown = Manager.this.metadata.getHost(stc.node.getAddress());
                                    if (hostDown == null) break;
                                    Manager.this.onDown(hostDown);
                                }
                            }
                            break;
                        }
                        case SCHEMA_CHANGE: {
                            ProtocolEvent.SchemaChange scc = (ProtocolEvent.SchemaChange)event;
                            switch (scc.change) {
                                case CREATED: {
                                    if (scc.table.isEmpty()) {
                                        Manager.this.submitSchemaRefresh(null, null);
                                        break block0;
                                    }
                                    Manager.this.submitSchemaRefresh(scc.keyspace, null);
                                    break block0;
                                }
                                case DROPPED: {
                                    if (scc.table.isEmpty()) {
                                        Manager.this.submitSchemaRefresh(null, null);
                                        break block0;
                                    }
                                    Manager.this.submitSchemaRefresh(scc.keyspace, null);
                                    break block0;
                                }
                                case UPDATED: {
                                    if (scc.table.isEmpty()) {
                                        Manager.this.submitSchemaRefresh(scc.keyspace, null);
                                        break block0;
                                    }
                                    Manager.this.submitSchemaRefresh(scc.keyspace, scc.table);
                                }
                            }
                        }
                    }
                }
            }, (long)this.delayForEvent(event), TimeUnit.SECONDS);
        }

        private int delayForEvent(ProtocolEvent event) {
            switch (event.type) {
                case TOPOLOGY_CHANGE: {
                    return 1;
                }
                case STATUS_CHANGE: {
                    ProtocolEvent.StatusChange stc = (ProtocolEvent.StatusChange)event;
                    if (stc.status != ProtocolEvent.StatusChange.Status.UP) break;
                    return 1;
                }
            }
            return 0;
        }

        private class ClusterShutdownFuture
        extends ShutdownFuture.Forwarding {
            ClusterShutdownFuture(List<ShutdownFuture> futures) {
                super(futures);
            }

            @Override
            public ShutdownFuture force() {
                Manager.this.reconnectionExecutor.shutdownNow();
                Manager.this.scheduledTasksExecutor.shutdownNow();
                Manager.this.executor.shutdownNow();
                return super.force();
            }

            @Override
            protected void onFuturesDone() {
                new Thread("Shutdown-checker"){

                    @Override
                    public void run() {
                        Manager.this.connectionFactory.shutdown();
                        try {
                            Manager.this.reconnectionExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
                            Manager.this.scheduledTasksExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
                            Manager.this.executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
                            ClusterShutdownFuture.this.set(null);
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            ClusterShutdownFuture.this.setException(e);
                        }
                    }
                }.start();
            }
        }
    }

    public static class Builder
    implements Initializer {
        private String clusterName;
        private final List<InetAddress> addresses = new ArrayList<InetAddress>();
        private int port = 9042;
        private AuthProvider authProvider = AuthProvider.NONE;
        private LoadBalancingPolicy loadBalancingPolicy;
        private ReconnectionPolicy reconnectionPolicy;
        private RetryPolicy retryPolicy;
        private ProtocolOptions.Compression compression = ProtocolOptions.Compression.NONE;
        private SSLOptions sslOptions = null;
        private boolean metricsEnabled = true;
        private boolean jmxEnabled = true;
        private PoolingOptions poolingOptions;
        private SocketOptions socketOptions;
        private QueryOptions queryOptions;
        private Collection<Host.StateListener> listeners;

        @Override
        public String getClusterName() {
            return this.clusterName;
        }

        @Override
        public List<InetAddress> getContactPoints() {
            return this.addresses;
        }

        public Builder withClusterName(String name) {
            this.clusterName = name;
            return this;
        }

        public Builder withPort(int port) {
            this.port = port;
            return this;
        }

        public Builder addContactPoint(String address) {
            try {
                this.addresses.add(InetAddress.getByName(address));
                return this;
            }
            catch (UnknownHostException e) {
                throw new IllegalArgumentException(e.getMessage());
            }
        }

        public Builder addContactPoints(String ... addresses) {
            for (String address : addresses) {
                this.addContactPoint(address);
            }
            return this;
        }

        public Builder addContactPoints(InetAddress ... addresses) {
            for (InetAddress address : addresses) {
                this.addresses.add(address);
            }
            return this;
        }

        public Builder withLoadBalancingPolicy(LoadBalancingPolicy policy) {
            this.loadBalancingPolicy = policy;
            return this;
        }

        public Builder withReconnectionPolicy(ReconnectionPolicy policy) {
            this.reconnectionPolicy = policy;
            return this;
        }

        public Builder withRetryPolicy(RetryPolicy policy) {
            this.retryPolicy = policy;
            return this;
        }

        public Builder withCredentials(String username, String password) {
            this.authProvider = new PlainTextAuthProvider(username, password);
            return this;
        }

        public Builder withAuthProvider(AuthProvider authProvider) {
            this.authProvider = authProvider;
            return this;
        }

        public Builder withCompression(ProtocolOptions.Compression compression) {
            this.compression = compression;
            return this;
        }

        public Builder withoutMetrics() {
            this.metricsEnabled = false;
            return this;
        }

        public Builder withSSL() {
            this.sslOptions = new SSLOptions();
            return this;
        }

        public Builder withSSL(SSLOptions sslOptions) {
            this.sslOptions = sslOptions;
            return this;
        }

        public Builder withInitialListeners(Collection<Host.StateListener> listeners) {
            this.listeners = listeners;
            return this;
        }

        public Builder withoutJMXReporting() {
            this.jmxEnabled = false;
            return this;
        }

        public Builder withPoolingOptions(PoolingOptions options) {
            this.poolingOptions = options;
            return this;
        }

        public Builder withSocketOptions(SocketOptions options) {
            this.socketOptions = options;
            return this;
        }

        public Builder withQueryOptions(QueryOptions options) {
            this.queryOptions = options;
            return this;
        }

        @Override
        public Configuration getConfiguration() {
            Policies policies = new Policies(this.loadBalancingPolicy == null ? Policies.defaultLoadBalancingPolicy() : this.loadBalancingPolicy, this.reconnectionPolicy == null ? Policies.defaultReconnectionPolicy() : this.reconnectionPolicy, this.retryPolicy == null ? Policies.defaultRetryPolicy() : this.retryPolicy);
            return new Configuration(policies, new ProtocolOptions(this.port, this.sslOptions, this.authProvider).setCompression(this.compression), this.poolingOptions == null ? new PoolingOptions() : this.poolingOptions, this.socketOptions == null ? new SocketOptions() : this.socketOptions, this.metricsEnabled ? new MetricsOptions(this.jmxEnabled) : null, this.queryOptions == null ? new QueryOptions() : this.queryOptions);
        }

        @Override
        public Collection<Host.StateListener> getInitialListeners() {
            return this.listeners == null ? Collections.emptySet() : this.listeners;
        }

        public Cluster build() {
            return Cluster.buildFrom(this);
        }
    }

    public static interface Initializer {
        public String getClusterName();

        public List<InetAddress> getContactPoints();

        public Configuration getConfiguration();

        public Collection<Host.StateListener> getInitialListeners();
    }
}

