package org.apache.pulsar.functions.worker;

import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import javax.ws.rs.core.Response;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.functions.worker.rest.WorkerServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/worker/Worker.class */
public class Worker {
    private static final Logger log = LoggerFactory.getLogger(Worker.class);
    private final WorkerConfig workerConfig;
    private final WorkerService workerService;
    private WorkerServer server;

    public Worker(WorkerConfig workerConfig) {
        this.workerConfig = workerConfig;
        this.workerService = new WorkerService(workerConfig);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void start() throws Exception {
        this.workerService.start(initialize(this.workerConfig));
        this.server = new WorkerServer(this.workerService);
        this.server.start();
        log.info("Start worker server on port {}...", Integer.valueOf(this.workerConfig.getWorkerPort()));
    }

    private static URI initialize(WorkerConfig workerConfig) throws InterruptedException, PulsarAdminException, IOException {
        PulsarAdmin pulsarAdminClient = Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(), workerConfig.getClientAuthenticationPlugin(), workerConfig.getClientAuthenticationParameters(), workerConfig.getTlsTrustCertsFilePath(), workerConfig.isTlsAllowInsecureConnection());
        log.info("Checking if pulsar service at {} is up...", workerConfig.getPulsarWebServiceUrl());
        int initialBrokerReconnectMaxRetries = workerConfig.getInitialBrokerReconnectMaxRetries();
        int i = 0;
        while (true) {
            try {
                pulsarAdminClient.clusters().getClusters();
                log.info("Initializing Pulsar Functions namespace...");
                try {
                    try {
                        pulsarAdminClient.namespaces().getPolicies(workerConfig.getPulsarFunctionsNamespace());
                    } catch (PulsarAdminException e) {
                        if (e.getStatusCode() != Response.Status.NOT_FOUND.getStatusCode()) {
                            log.error("Failed to get retention policy for pulsar function namespace {}", workerConfig.getPulsarFunctionsNamespace(), e);
                            throw e;
                        }
                        try {
                            Policies policies = new Policies();
                            policies.retention_policies = new RetentionPolicies(-1, -1);
                            policies.replication_clusters = new HashSet();
                            policies.replication_clusters.add(workerConfig.getPulsarFunctionsCluster());
                            pulsarAdminClient.namespaces().createNamespace(workerConfig.getPulsarFunctionsNamespace(), policies);
                        } catch (PulsarAdminException e2) {
                            if (e2.getStatusCode() != Response.Status.CONFLICT.getStatusCode()) {
                                log.error("Failed to create namespace {} for pulsar functions", workerConfig.getPulsarFunctionsNamespace(), e2);
                                throw e2;
                            }
                        }
                    }
                    try {
                        InternalConfigurationData internalConfigurationData = pulsarAdminClient.brokers().getInternalConfigurationData();
                        try {
                            return Utils.initializeDlogNamespace(internalConfigurationData.getZookeeperServers(), internalConfigurationData.getLedgersRootPath());
                        } catch (IOException e3) {
                            log.error("Failed to initialize dlog namespace at zookeeper {} for storing function packages", internalConfigurationData.getZookeeperServers(), e3);
                            throw e3;
                        }
                    } catch (PulsarAdminException e4) {
                        log.error("Failed to retrieve broker internal configuration", e4);
                        throw e4;
                    }
                } finally {
                    pulsarAdminClient.close();
                }
            } catch (PulsarAdminException e5) {
                log.warn("Failed to retrieve clusters from pulsar service", e5);
                log.warn("Retry to connect to Pulsar service at {}", workerConfig.getPulsarWebServiceUrl());
                if (i >= initialBrokerReconnectMaxRetries) {
                    log.error("Failed to connect to Pulsar service at {} after {} attempts", workerConfig.getPulsarFunctionsNamespace(), Integer.valueOf(initialBrokerReconnectMaxRetries));
                    throw e5;
                }
                i++;
                Thread.sleep(1000L);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void stop() {
        try {
            if (null != this.server) {
                this.server.stop();
            }
            this.workerService.stop();
        } catch (Exception e) {
            log.warn("Failed to gracefully stop worker service ", e);
        }
    }
}
