package io.cdap.cdap.data2.datafabric.dataset.service;

import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import io.cdap.cdap.api.metrics.MetricsCollectionService;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.discovery.ResolvingDiscoverable;
import io.cdap.cdap.common.http.CommonNettyHttpServiceBuilder;
import io.cdap.cdap.common.metrics.MetricsReporterHook;
import io.cdap.cdap.data2.metrics.DatasetMetricsReporter;
import io.cdap.http.ChannelPipelineModifier;
import io.cdap.http.HttpHandler;
import io.cdap.http.NettyHttpService;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpRequest;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.twill.common.Cancellable;
import org.apache.twill.discovery.Discoverable;
import org.apache.twill.discovery.DiscoveryService;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.apache.twill.discovery.ServiceDiscovered;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/cdap/cdap/data2/datafabric/dataset/service/DatasetService.class */
public class DatasetService extends AbstractService {
    private static final Logger LOG = LoggerFactory.getLogger(DatasetService.class);
    private final NettyHttpService httpService;
    private final DiscoveryService discoveryService;
    private final DiscoveryServiceClient discoveryServiceClient;
    private final Set<DatasetMetricsReporter> metricReporters;
    private final DatasetTypeService typeService;
    private final CConfiguration cConf;
    private Cancellable cancelDiscovery;
    private Cancellable opExecutorServiceWatch;
    private SettableFuture<ServiceDiscovered> opExecutorDiscovered;
    private volatile boolean stopping;

    @Inject
    public DatasetService(CConfiguration cConfiguration, DiscoveryService discoveryService, DiscoveryServiceClient discoveryServiceClient, MetricsCollectionService metricsCollectionService, Set<DatasetMetricsReporter> set, DatasetTypeService datasetTypeService, DatasetInstanceService datasetInstanceService) {
        this.cConf = cConfiguration;
        this.typeService = datasetTypeService;
        HttpHandler datasetTypeHandler = new DatasetTypeHandler(datasetTypeService);
        HttpHandler datasetInstanceHandler = new DatasetInstanceHandler(datasetInstanceService);
        CommonNettyHttpServiceBuilder commonNettyHttpServiceBuilder = new CommonNettyHttpServiceBuilder(cConfiguration, "dataset.service");
        if (LOG.isTraceEnabled()) {
            commonNettyHttpServiceBuilder.addChannelPipelineModifier(new ChannelPipelineModifier() { // from class: io.cdap.cdap.data2.datafabric.dataset.service.DatasetService.1
                public void modify(ChannelPipeline channelPipeline) {
                    channelPipeline.addBefore("router", "logger", new ChannelInboundHandlerAdapter() { // from class: io.cdap.cdap.data2.datafabric.dataset.service.DatasetService.1.1
                        public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
                            if (obj instanceof HttpRequest) {
                                HttpRequest httpRequest = (HttpRequest) obj;
                                DatasetService.LOG.trace("Received {} for {} on channel {}", new Object[]{httpRequest.method(), httpRequest.uri(), channelHandlerContext.channel()});
                            }
                            super.channelRead(channelHandlerContext, obj);
                        }
                    });
                }
            });
        }
        this.httpService = commonNettyHttpServiceBuilder.setHttpHandlers(new HttpHandler[]{datasetTypeHandler, datasetInstanceHandler}).setHandlerHooks(ImmutableList.of(new MetricsReporterHook(metricsCollectionService, "dataset.service"))).setHost(cConfiguration.get("master.services.bind.address")).setPort(cConfiguration.getInt("dataset.service.bind.port")).setConnectionBacklog(cConfiguration.getInt("dataset.service.connection.backlog")).setExecThreadPoolSize(cConfiguration.getInt("dataset.service.exec.threads")).setBossThreadPoolSize(cConfiguration.getInt("dataset.service.boss.threads")).setWorkerThreadPoolSize(cConfiguration.getInt("dataset.service.worker.threads")).build();
        this.discoveryService = discoveryService;
        this.discoveryServiceClient = discoveryServiceClient;
        this.metricReporters = set;
    }

    protected void doStart() {
        Thread thread = new Thread(this::startUp);
        thread.setName("dataset.service-startup");
        thread.start();
    }

    protected void doStop() {
        this.stopping = true;
        Thread thread = new Thread(this::shutDown);
        thread.setName("dataset.service-shutdown");
        thread.start();
    }

    private void startUp() {
        try {
            LOG.info("Starting DatasetService...");
            this.typeService.startAndWait();
            this.httpService.start();
            ServiceDiscovered discover = this.discoveryServiceClient.discover("dataset.executor");
            this.opExecutorDiscovered = SettableFuture.create();
            this.opExecutorServiceWatch = discover.watchChanges(serviceDiscovered -> {
                if (Iterables.isEmpty(serviceDiscovered)) {
                    return;
                }
                LOG.info("Discovered {} service", "dataset.executor");
                this.opExecutorDiscovered.set(serviceDiscovered);
            }, MoreExecutors.sameThreadExecutor());
            Iterator<DatasetMetricsReporter> it = this.metricReporters.iterator();
            while (it.hasNext()) {
                it.next().start();
            }
            notifyStarted();
            try {
                waitForOpExecutorToStart();
                InetSocketAddress inetSocketAddress = new InetSocketAddress(this.cConf.get("master.services.announce.address", this.httpService.getBindAddress().getHostName()), this.cConf.getInt("dataset.service.announce.port", this.httpService.getBindAddress().getPort()));
                LOG.info("Announcing DatasetService for discovery...");
                this.cancelDiscovery = this.discoveryService.register(ResolvingDiscoverable.of(new Discoverable("dataset.service", inetSocketAddress)));
                LOG.info("DatasetService started successfully on {}", inetSocketAddress);
            } catch (Throwable th) {
                try {
                    doShutdown();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                notifyFailed(th);
            }
        } catch (Throwable th3) {
            notifyFailed(th3);
        }
    }

    private void waitForOpExecutorToStart() throws Exception {
        LOG.info("Waiting for {} service to be discoverable", "dataset.executor");
        while (!this.stopping) {
            try {
                this.opExecutorDiscovered.get(1L, TimeUnit.SECONDS);
                this.opExecutorServiceWatch.cancel();
                return;
            } catch (InterruptedException e) {
                LOG.warn("Got interrupted while waiting for service {}", "dataset.executor");
                Thread.currentThread().interrupt();
                this.opExecutorServiceWatch.cancel();
                return;
            } catch (ExecutionException e2) {
                LOG.error("Error during discovering service {}, DatasetService start failed", "dataset.executor");
                this.opExecutorServiceWatch.cancel();
                throw e2;
            } catch (TimeoutException e3) {
            }
        }
    }

    protected void shutDown() {
        try {
            doShutdown();
            notifyStopped();
        } catch (Throwable th) {
            notifyFailed(th);
        }
    }

    private void doShutdown() throws Exception {
        LOG.info("Stopping DatasetService...");
        if (this.cancelDiscovery != null) {
            this.cancelDiscovery.cancel();
        }
        Iterator<DatasetMetricsReporter> it = this.metricReporters.iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
        if (this.opExecutorServiceWatch != null) {
            this.opExecutorServiceWatch.cancel();
        }
        this.typeService.stopAndWait();
        this.httpService.stop(3L, 5L, TimeUnit.SECONDS);
        LOG.info("DatasetService stopped");
    }

    public String toString() {
        return Objects.toStringHelper(this).add("bindAddress", this.httpService.getBindAddress()).toString();
    }
}
