package oadd.org.apache.drill.exec.coord.zk;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import oadd.com.google.common.base.Function;
import oadd.com.google.common.base.Throwables;
import oadd.com.google.common.collect.Collections2;
import oadd.org.apache.commons.collections.keyvalue.MultiKey;
import oadd.org.apache.curator.framework.CuratorFramework;
import oadd.org.apache.curator.framework.CuratorFrameworkFactory;
import oadd.org.apache.curator.framework.state.ConnectionState;
import oadd.org.apache.curator.framework.state.ConnectionStateListener;
import oadd.org.apache.curator.retry.RetryNTimes;
import oadd.org.apache.curator.x.discovery.ServiceCache;
import oadd.org.apache.curator.x.discovery.ServiceDiscovery;
import oadd.org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import oadd.org.apache.curator.x.discovery.ServiceInstance;
import oadd.org.apache.curator.x.discovery.details.ServiceCacheListener;
import oadd.org.apache.drill.common.AutoCloseables;
import oadd.org.apache.drill.common.config.DrillConfig;
import oadd.org.apache.drill.exec.ExecConstants;
import oadd.org.apache.drill.exec.coord.ClusterCoordinator;
import oadd.org.apache.drill.exec.coord.DistributedSemaphore;
import oadd.org.apache.drill.exec.coord.DrillServiceInstanceHelper;
import oadd.org.apache.drill.exec.coord.store.CachingTransientStoreFactory;
import oadd.org.apache.drill.exec.coord.store.TransientStore;
import oadd.org.apache.drill.exec.coord.store.TransientStoreConfig;
import oadd.org.apache.drill.exec.coord.store.TransientStoreFactory;
import oadd.org.apache.drill.exec.proto.CoordinationProtos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:oadd/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.class */
public class ZKClusterCoordinator extends ClusterCoordinator {
    private CuratorFramework curator;
    private ServiceDiscovery<CoordinationProtos.DrillbitEndpoint> discovery;
    private volatile Collection<CoordinationProtos.DrillbitEndpoint> endpoints;
    private final String serviceName;
    private final CountDownLatch initialConnection;
    private final TransientStoreFactory factory;
    private ServiceCache<CoordinationProtos.DrillbitEndpoint> serviceCache;
    private CoordinationProtos.DrillbitEndpoint endpoint;
    private ConcurrentHashMap<MultiKey, CoordinationProtos.DrillbitEndpoint> endpointsMap;
    static final Logger logger = LoggerFactory.getLogger(ZKClusterCoordinator.class);
    private static final Pattern ZK_COMPLEX_STRING = Pattern.compile("(^.*?)/(.*)/([^/]*)$");

    /* loaded from: input_file:oadd/org/apache/drill/exec/coord/zk/ZKClusterCoordinator$EndpointListener.class */
    private class EndpointListener implements ServiceCacheListener {
        private EndpointListener() {
        }

        @Override // oadd.org.apache.curator.framework.state.ConnectionStateListener
        public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
        }

        @Override // oadd.org.apache.curator.x.discovery.details.ServiceCacheListener
        public void cacheChanged() {
            ZKClusterCoordinator.logger.debug("Got cache changed --> updating endpoints");
            ZKClusterCoordinator.this.updateEndpoints();
        }
    }

    /* loaded from: input_file:oadd/org/apache/drill/exec/coord/zk/ZKClusterCoordinator$InitialConnectionListener.class */
    private class InitialConnectionListener implements ConnectionStateListener {
        private InitialConnectionListener() {
        }

        @Override // oadd.org.apache.curator.framework.state.ConnectionStateListener
        public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
            if (connectionState == ConnectionState.CONNECTED) {
                ZKClusterCoordinator.this.initialConnection.countDown();
                curatorFramework.getConnectionStateListenable().removeListener(this);
            }
        }
    }

    public ZKClusterCoordinator(DrillConfig drillConfig) throws IOException {
        this(drillConfig, null);
    }

    public ZKClusterCoordinator(DrillConfig drillConfig, String str) throws IOException {
        this.endpoints = Collections.emptyList();
        this.initialConnection = new CountDownLatch(1);
        this.endpointsMap = new ConcurrentHashMap<>();
        String string = (str == null || str.isEmpty()) ? drillConfig.getString(ExecConstants.ZK_CONNECTION) : str;
        String string2 = drillConfig.getString(ExecConstants.SERVICE_NAME);
        String string3 = drillConfig.getString(ExecConstants.ZK_ROOT);
        Matcher matcher = ZK_COMPLEX_STRING.matcher(string);
        if (matcher.matches()) {
            string = matcher.group(1);
            string3 = matcher.group(2);
            string2 = matcher.group(3);
        }
        logger.debug("Connect {}, zkRoot {}, clusterId: " + string2, string, string3);
        this.serviceName = string2;
        this.curator = CuratorFrameworkFactory.builder().namespace(string3).connectionTimeoutMs(drillConfig.getInt(ExecConstants.ZK_TIMEOUT)).retryPolicy(new RetryNTimes(drillConfig.getInt(ExecConstants.ZK_RETRY_TIMES), drillConfig.getInt(ExecConstants.ZK_RETRY_DELAY))).connectString(string).build();
        this.curator.getConnectionStateListenable().addListener(new InitialConnectionListener());
        this.curator.start();
        this.discovery = newDiscovery();
        this.factory = CachingTransientStoreFactory.of(new ZkTransientStoreFactory(this.curator));
    }

    public CuratorFramework getCurator() {
        return this.curator;
    }

    @Override // oadd.org.apache.drill.exec.coord.ClusterCoordinator
    public void start(long j) throws Exception {
        logger.debug("Starting ZKClusterCoordination.");
        this.discovery.start();
        if (j == 0) {
            this.initialConnection.await();
        } else if (!this.initialConnection.await(j, TimeUnit.MILLISECONDS)) {
            throw new IOException(String.format("Failure to connect to the zookeeper cluster service within the allotted time of %d milliseconds.", Long.valueOf(j)));
        }
        this.serviceCache = this.discovery.serviceCacheBuilder().name(this.serviceName).build();
        this.serviceCache.addListener(new EndpointListener());
        this.serviceCache.start();
        updateEndpoints();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        AutoCloseables.close(this.serviceCache, this.discovery, this.curator, this.factory);
    }

    @Override // oadd.org.apache.drill.exec.coord.ClusterCoordinator
    public ClusterCoordinator.RegistrationHandle register(CoordinationProtos.DrillbitEndpoint drillbitEndpoint) {
        try {
            CoordinationProtos.DrillbitEndpoint build = drillbitEndpoint.toBuilder().setState(CoordinationProtos.DrillbitEndpoint.State.ONLINE).build();
            ServiceInstance<CoordinationProtos.DrillbitEndpoint> newServiceInstance = newServiceInstance(build);
            this.discovery.registerService(newServiceInstance);
            return new ZKRegistrationHandle(newServiceInstance.getId(), build);
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    @Override // oadd.org.apache.drill.exec.coord.ClusterCoordinator
    public void unregister(ClusterCoordinator.RegistrationHandle registrationHandle) {
        if (!(registrationHandle instanceof ZKRegistrationHandle)) {
            throw new UnsupportedOperationException("Unknown handle type: " + registrationHandle.getClass().getName());
        }
        this.listeners.clear();
        try {
            this.discovery.unregisterService(ServiceInstance.builder().address("").port(0).id(((ZKRegistrationHandle) registrationHandle).id).name(this.serviceName).build());
        } catch (Exception e) {
            Throwables.propagate(e);
        }
    }

    @Override // oadd.org.apache.drill.exec.coord.ClusterCoordinator
    public ClusterCoordinator.RegistrationHandle update(ClusterCoordinator.RegistrationHandle registrationHandle, CoordinationProtos.DrillbitEndpoint.State state) {
        ZKRegistrationHandle zKRegistrationHandle = (ZKRegistrationHandle) registrationHandle;
        try {
            this.endpoint = zKRegistrationHandle.endpoint.toBuilder().setState(state).build();
            this.discovery.updateService(ServiceInstance.builder().name(this.serviceName).id(zKRegistrationHandle.id).payload(this.endpoint).build());
        } catch (Exception e) {
            Throwables.propagate(e);
        }
        return registrationHandle;
    }

    @Override // oadd.org.apache.drill.exec.coord.ClusterCoordinator
    public Collection<CoordinationProtos.DrillbitEndpoint> getAvailableEndpoints() {
        return this.endpoints;
    }

    @Override // oadd.org.apache.drill.exec.coord.ClusterCoordinator
    public Collection<CoordinationProtos.DrillbitEndpoint> getOnlineEndPoints() {
        ArrayList arrayList = new ArrayList();
        for (CoordinationProtos.DrillbitEndpoint drillbitEndpoint : this.endpoints) {
            if (isDrillbitInState(drillbitEndpoint, CoordinationProtos.DrillbitEndpoint.State.ONLINE)) {
                arrayList.add(drillbitEndpoint);
            }
        }
        logger.debug("Online endpoints in ZK are" + arrayList.toString());
        return arrayList;
    }

    @Override // oadd.org.apache.drill.exec.coord.ClusterCoordinator
    public DistributedSemaphore getSemaphore(String str, int i) {
        return new ZkDistributedSemaphore(this.curator, "/semaphore/" + str, i);
    }

    @Override // oadd.org.apache.drill.exec.coord.ClusterCoordinator
    public <V> TransientStore<V> getOrCreateTransientStore(TransientStoreConfig<V> transientStoreConfig) {
        return (ZkEphemeralStore) this.factory.getOrCreateStore(transientStoreConfig);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void updateEndpoints() {
        try {
            Collection<CoordinationProtos.DrillbitEndpoint> transform = Collections2.transform(this.discovery.queryForInstances(this.serviceName), new Function<ServiceInstance<CoordinationProtos.DrillbitEndpoint>, CoordinationProtos.DrillbitEndpoint>() { // from class: oadd.org.apache.drill.exec.coord.zk.ZKClusterCoordinator.1
                @Override // oadd.com.google.common.base.Function
                public CoordinationProtos.DrillbitEndpoint apply(ServiceInstance<CoordinationProtos.DrillbitEndpoint> serviceInstance) {
                    return serviceInstance.getPayload();
                }
            });
            Set<CoordinationProtos.DrillbitEndpoint> hashSet = new HashSet<>();
            Set<CoordinationProtos.DrillbitEndpoint> hashSet2 = new HashSet<>();
            for (CoordinationProtos.DrillbitEndpoint drillbitEndpoint : transform) {
                String address = drillbitEndpoint.getAddress();
                int userPort = drillbitEndpoint.getUserPort();
                if (!this.endpointsMap.containsKey(new MultiKey(address, Integer.valueOf(userPort)))) {
                    hashSet2.add(drillbitEndpoint);
                }
                this.endpointsMap.put(new MultiKey(address, Integer.valueOf(userPort)), drillbitEndpoint);
            }
            for (MultiKey multiKey : this.endpointsMap.keySet()) {
                if (!transform.contains(this.endpointsMap.get(multiKey))) {
                    hashSet.add(this.endpointsMap.get(multiKey));
                    this.endpointsMap.remove(multiKey);
                }
            }
            this.endpoints = this.endpointsMap.values();
            if (logger.isDebugEnabled()) {
                StringBuilder sb = new StringBuilder();
                sb.append("Active drillbit set changed.  Now includes ");
                sb.append(transform.size());
                sb.append(" total bits. New active drillbits:\n");
                sb.append("Address | User Port | Control Port | Data Port | Version | State\n");
                for (CoordinationProtos.DrillbitEndpoint drillbitEndpoint2 : transform) {
                    sb.append(drillbitEndpoint2.getAddress()).append(" | ");
                    sb.append(drillbitEndpoint2.getUserPort()).append(" | ");
                    sb.append(drillbitEndpoint2.getControlPort()).append(" | ");
                    sb.append(drillbitEndpoint2.getDataPort()).append(" | ");
                    sb.append(drillbitEndpoint2.getVersion()).append(" |");
                    sb.append(drillbitEndpoint2.getState()).append(" | ");
                    sb.append('\n');
                }
                logger.debug(sb.toString());
            }
            if (!hashSet.isEmpty()) {
                drillbitUnregistered(hashSet);
            }
            if (!hashSet2.isEmpty()) {
                drillbitRegistered(hashSet2);
            }
        } catch (Exception e) {
            logger.error("Failure while update Drillbit service location cache.", (Throwable) e);
        }
    }

    protected ServiceInstance<CoordinationProtos.DrillbitEndpoint> newServiceInstance(CoordinationProtos.DrillbitEndpoint drillbitEndpoint) throws Exception {
        return ServiceInstance.builder().name(this.serviceName).payload(drillbitEndpoint).build();
    }

    protected ServiceDiscovery<CoordinationProtos.DrillbitEndpoint> newDiscovery() {
        return ServiceDiscoveryBuilder.builder(CoordinationProtos.DrillbitEndpoint.class).basePath("/").client(this.curator).serializer(DrillServiceInstanceHelper.SERIALIZER).build();
    }
}
