/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker;

import com.google.common.util.concurrent.AbstractService;
import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import javax.ws.rs.core.Response;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.functions.worker.Utils;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.WorkerServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Worker
extends AbstractService {
    private static final Logger log = LoggerFactory.getLogger(Worker.class);
    private final WorkerConfig workerConfig;
    private final WorkerService workerService;
    private Thread serverThread;

    public Worker(WorkerConfig workerConfig) {
        this.workerConfig = workerConfig;
        this.workerService = new WorkerService(workerConfig);
    }

    protected void doStart() {
        try {
            this.doStartImpl();
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            log.error("Interrupted at starting worker", (Throwable)ie);
        }
        catch (Throwable t) {
            log.error("Failed to start worker", t);
        }
    }

    protected void doStartImpl() throws InterruptedException, IOException, PulsarAdminException {
        URI dlogUri = Worker.initialize(this.workerConfig);
        this.workerService.start(dlogUri);
        WorkerServer server = new WorkerServer(this.workerService);
        this.serverThread = new Thread((Runnable)server, server.getThreadName());
        log.info("Start worker server on port {}...", (Object)this.workerConfig.getWorkerPort());
        this.serverThread.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static URI initialize(WorkerConfig workerConfig) throws InterruptedException, PulsarAdminException, IOException {
        InternalConfigurationData internalConf;
        PulsarAdmin admin = Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(), workerConfig.getClientAuthenticationPlugin(), workerConfig.getClientAuthenticationParameters(), workerConfig.getTlsTrustCertsFilePath(), workerConfig.isTlsAllowInsecureConnection());
        log.info("Checking if pulsar service at {} is up...", (Object)workerConfig.getPulsarWebServiceUrl());
        int maxRetries = workerConfig.getInitialBrokerReconnectMaxRetries();
        int retries = 0;
        while (true) {
            try {
                admin.clusters().getClusters();
            }
            catch (PulsarAdminException e) {
                log.warn("Failed to retrieve clusters from pulsar service", (Throwable)e);
                log.warn("Retry to connect to Pulsar service at {}", (Object)workerConfig.getPulsarWebServiceUrl());
                if (retries >= maxRetries) {
                    log.error("Failed to connect to Pulsar service at {} after {} attempts", (Object)workerConfig.getPulsarFunctionsNamespace(), (Object)maxRetries);
                    throw e;
                }
                ++retries;
                Thread.sleep(1000L);
                continue;
            }
            break;
        }
        log.info("Initializing Pulsar Functions namespace...");
        try {
            block17: {
                try {
                    admin.namespaces().getPolicies(workerConfig.getPulsarFunctionsNamespace());
                }
                catch (PulsarAdminException e) {
                    if (e.getStatusCode() == Response.Status.NOT_FOUND.getStatusCode()) {
                        try {
                            Policies policies = new Policies();
                            policies.retention_policies = new RetentionPolicies(-1, -1);
                            policies.replication_clusters = new HashSet();
                            policies.replication_clusters.add(workerConfig.getPulsarFunctionsCluster());
                            admin.namespaces().createNamespace(workerConfig.getPulsarFunctionsNamespace(), policies);
                        }
                        catch (PulsarAdminException e1) {
                            if (e1.getStatusCode() != Response.Status.CONFLICT.getStatusCode()) {
                                log.error("Failed to create namespace {} for pulsar functions", (Object)workerConfig.getPulsarFunctionsNamespace(), (Object)e1);
                                throw e1;
                            }
                            break block17;
                        }
                    }
                    log.error("Failed to get retention policy for pulsar function namespace {}", (Object)workerConfig.getPulsarFunctionsNamespace(), (Object)e);
                    throw e;
                }
            }
            try {
                internalConf = admin.brokers().getInternalConfigurationData();
            }
            catch (PulsarAdminException e) {
                log.error("Failed to retrieve broker internal configuration", (Throwable)e);
                throw e;
            }
        }
        finally {
            admin.close();
        }
        try {
            return Utils.initializeDlogNamespace(internalConf.getZookeeperServers(), internalConf.getLedgersRootPath());
        }
        catch (IOException ioe) {
            log.error("Failed to initialize dlog namespace at zookeeper {} for storing function packages", (Object)internalConf.getZookeeperServers(), (Object)ioe);
            throw ioe;
        }
    }

    protected void doStop() {
        if (null != this.serverThread) {
            this.serverThread.interrupt();
            try {
                this.serverThread.join();
            }
            catch (InterruptedException e) {
                log.warn("Worker server thread is interrupted", (Throwable)e);
            }
        }
        this.workerService.stop();
    }
}

