/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker.rest.api;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import org.apache.pulsar.broker.authentication.AuthenticationParameters;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.FunctionInstanceStatsImpl;
import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.MembershipManager;
import org.apache.pulsar.functions.worker.PulsarWorkerService;
import org.apache.pulsar.functions.worker.SchedulerManager;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.rest.RestUtils;
import org.apache.pulsar.functions.worker.service.api.Workers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkerImpl
implements Workers<PulsarWorkerService> {
    private static final Logger log = LoggerFactory.getLogger(WorkerImpl.class);
    private final Supplier<PulsarWorkerService> workerServiceSupplier;

    public WorkerImpl(Supplier<PulsarWorkerService> workerServiceSupplier) {
        this.workerServiceSupplier = workerServiceSupplier;
    }

    private PulsarWorkerService worker() {
        try {
            return (PulsarWorkerService)Preconditions.checkNotNull((Object)this.workerServiceSupplier.get());
        }
        catch (Throwable t) {
            log.info("Failed to get worker service", t);
            throw t;
        }
    }

    private boolean isWorkerServiceAvailable() {
        WorkerService workerService = this.workerServiceSupplier.get();
        if (workerService == null) {
            return false;
        }
        return workerService.isInitialized();
    }

    @Override
    public List<WorkerInfo> getCluster(AuthenticationParameters authParams) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        this.throwIfNotSuperUser(authParams, "get cluster");
        List<WorkerInfo> workers = this.worker().getMembershipManager().getCurrentMembership();
        return workers;
    }

    @Override
    public WorkerInfo getClusterLeader(AuthenticationParameters authParams) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        this.throwIfNotSuperUser(authParams, "get cluster leader");
        MembershipManager membershipManager = this.worker().getMembershipManager();
        WorkerInfo leader = membershipManager.getLeader();
        if (leader == null) {
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, "Leader cannot be determined");
        }
        return leader;
    }

    @Override
    public Map<String, Collection<String>> getAssignments(AuthenticationParameters authParams) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        this.throwIfNotSuperUser(authParams, "get cluster assignments");
        FunctionRuntimeManager functionRuntimeManager = this.worker().getFunctionRuntimeManager();
        Map<String, Map<String, Function.Assignment>> assignments = functionRuntimeManager.getCurrentAssignments();
        HashMap<String, Collection<String>> ret = new HashMap<String, Collection<String>>();
        for (Map.Entry<String, Map<String, Function.Assignment>> entry : assignments.entrySet()) {
            ret.put(entry.getKey(), entry.getValue().keySet());
        }
        return ret;
    }

    private void throwIfNotSuperUser(AuthenticationParameters authParams, String action) {
        if (this.worker().getWorkerConfig().isAuthorizationEnabled()) {
            try {
                if (authParams.getClientRole() == null || !((Boolean)this.worker().getAuthorizationService().isSuperUser(authParams).get(this.worker().getWorkerConfig().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS)).booleanValue()) {
                    log.error("Client with role [{}] and originalPrincipal [{}] is not authorized to {}", new Object[]{authParams.getClientRole(), authParams.getOriginalPrincipal(), action});
                    throw new RestException(Response.Status.UNAUTHORIZED, "Client is not authorized to perform operation");
                }
            }
            catch (InterruptedException | ExecutionException | TimeoutException e) {
                log.warn("Time-out {} sec while checking the role {} originalPrincipal {} is a super user role ", new Object[]{this.worker().getWorkerConfig().getMetadataStoreOperationTimeoutSeconds(), authParams.getClientRole(), authParams.getOriginalPrincipal()});
                throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
            }
        }
    }

    @Override
    public List<Metrics> getWorkerMetrics(AuthenticationParameters authParams) {
        if (!this.isWorkerServiceAvailable() || this.worker().getMetricsGenerator() == null) {
            RestUtils.throwUnavailableException();
        }
        this.throwIfNotSuperUser(authParams, "get worker stats");
        return this.worker().getMetricsGenerator().generate();
    }

    @Override
    public List<WorkerFunctionInstanceStats> getFunctionsMetrics(AuthenticationParameters authParams) throws IOException {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        this.throwIfNotSuperUser(authParams, "get function stats");
        Map<String, FunctionRuntimeInfo> functionRuntimes = this.worker().getFunctionRuntimeManager().getFunctionRuntimeInfos();
        ArrayList<WorkerFunctionInstanceStats> metricsList = new ArrayList<WorkerFunctionInstanceStats>(functionRuntimes.size());
        for (Map.Entry<String, FunctionRuntimeInfo> entry : functionRuntimes.entrySet()) {
            String fullyQualifiedInstanceName = entry.getKey();
            FunctionRuntimeInfo functionRuntimeInfo = entry.getValue();
            if (this.worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) {
                Function.FunctionDetails functionDetails = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails();
                int parallelism = functionDetails.getParallelism();
                for (int i = 0; i < parallelism; ++i) {
                    FunctionInstanceStatsImpl functionInstanceStats = WorkerUtils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo, i);
                    WorkerFunctionInstanceStats workerFunctionInstanceStats = new WorkerFunctionInstanceStats();
                    workerFunctionInstanceStats.setName(FunctionCommon.getFullyQualifiedInstanceId((String)functionDetails.getTenant(), (String)functionDetails.getNamespace(), (String)functionDetails.getName(), (int)i));
                    workerFunctionInstanceStats.setMetrics(functionInstanceStats.getMetrics());
                    metricsList.add(workerFunctionInstanceStats);
                }
                continue;
            }
            FunctionInstanceStatsImpl functionInstanceStats = WorkerUtils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo, functionRuntimeInfo.getFunctionInstance().getInstanceId());
            WorkerFunctionInstanceStats workerFunctionInstanceStats = new WorkerFunctionInstanceStats();
            workerFunctionInstanceStats.setName(fullyQualifiedInstanceName);
            workerFunctionInstanceStats.setMetrics(functionInstanceStats.getMetrics());
            metricsList.add(workerFunctionInstanceStats);
        }
        return metricsList;
    }

    @Override
    public List<ConnectorDefinition> getListOfConnectors(AuthenticationParameters authParams) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        this.throwIfNotSuperUser(authParams, "get list of connectors");
        return this.worker().getConnectorsManager().getConnectorDefinitions();
    }

    @Override
    public void rebalance(URI uri, AuthenticationParameters authParams) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        this.throwIfNotSuperUser(authParams, "rebalance cluster");
        if (this.worker().getLeaderService().isLeader()) {
            try {
                this.worker().getSchedulerManager().rebalanceIfNotInprogress();
            }
            catch (SchedulerManager.RebalanceInProgressException e) {
                throw new RestException(Response.Status.BAD_REQUEST, "Rebalance already in progress");
            }
            catch (SchedulerManager.TooFewWorkersException e) {
                throw new RestException(Response.Status.BAD_REQUEST, "Too few workers (need at least 2)");
            }
        } else {
            WorkerInfo workerInfo = this.worker().getMembershipManager().getLeader();
            if (workerInfo == null) {
                throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, "Leader cannot be determined");
            }
            URI redirect = UriBuilder.fromUri((URI)uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build(new Object[0]);
            throw new WebApplicationException(Response.temporaryRedirect((URI)redirect).build());
        }
    }

    @Override
    public void drain(URI uri, String inWorkerId, AuthenticationParameters authParams, boolean calledOnLeaderUri) {
        String workerId;
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        String actualWorkerId = this.worker().getWorkerConfig().getWorkerId();
        String string = workerId = inWorkerId == null || inWorkerId.isEmpty() ? actualWorkerId : inWorkerId;
        if (log.isDebugEnabled()) {
            log.debug("drain called with URI={}, inWorkerId={}, workerId={}, clientRole={}, originalPrincipal={}, calledOnLeaderUri={}, on actual worker-id={}", new Object[]{uri, inWorkerId, workerId, authParams.getClientRole(), authParams.getOriginalPrincipal(), calledOnLeaderUri, actualWorkerId});
        }
        this.throwIfNotSuperUser(authParams, "drain worker");
        if (this.worker().getLeaderService().isLeader()) {
            try {
                this.worker().getSchedulerManager().drainIfNotInProgress(workerId);
            }
            catch (SchedulerManager.DrainInProgressException e) {
                throw new RestException(Response.Status.CONFLICT, "Another drain is in progress");
            }
            catch (SchedulerManager.TooFewWorkersException e) {
                throw new RestException(Response.Status.BAD_REQUEST, "Too few workers (need at least 2)");
            }
            catch (SchedulerManager.WorkerNotRemovedAfterPriorDrainException e) {
                String errString = "Worker " + workerId + " was not yet removed after a prior drain op; try later";
                throw new RestException(Response.Status.PRECONDITION_FAILED, errString);
            }
            catch (SchedulerManager.UnknownWorkerException e) {
                String errString = "Worker " + workerId + " is not among the current workers in the system";
                throw new RestException(Response.Status.BAD_REQUEST, errString);
            }
        } else {
            URI redirect = this.buildRedirectUriForDrainRelatedOp(uri, workerId);
            log.info("Not leader; redirect URI={}", (Object)redirect);
            throw new WebApplicationException(Response.temporaryRedirect((URI)redirect).build());
        }
    }

    @Override
    public LongRunningProcessStatus getDrainStatus(URI uri, String inWorkerId, AuthenticationParameters authParams, boolean calledOnLeaderUri) {
        String workerId;
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        String actualWorkerId = this.worker().getWorkerConfig().getWorkerId();
        String string = workerId = inWorkerId == null || inWorkerId.isEmpty() ? actualWorkerId : inWorkerId;
        if (log.isDebugEnabled()) {
            log.debug("getDrainStatus called with uri={}, inWorkerId={}, workerId={}, clientRole={}, originalPrincipal={}, calledOnLeaderUri={}, on actual workerId={}", new Object[]{uri, inWorkerId, workerId, authParams.getClientRole(), authParams.getOriginalPrincipal(), calledOnLeaderUri, actualWorkerId});
        }
        this.throwIfNotSuperUser(authParams, "get drain status of worker");
        if (this.worker().getLeaderService().isLeader()) {
            return this.worker().getSchedulerManager().getDrainStatus(workerId);
        }
        URI redirect = this.buildRedirectUriForDrainRelatedOp(uri, workerId);
        log.info("Not leader; redirect URI={}", (Object)redirect);
        throw new WebApplicationException(Response.temporaryRedirect((URI)redirect).build());
    }

    @Override
    public boolean isLeaderReady(AuthenticationParameters authParams) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        if (this.worker().getLeaderService().isLeader()) {
            return true;
        }
        RestUtils.throwUnavailableException();
        return false;
    }

    private URI buildRedirectUriForDrainRelatedOp(URI uri, String workerId) {
        String leaderPath = "admin/v2/worker/leader/drain";
        WorkerInfo workerInfo = this.worker().getMembershipManager().getLeader();
        if (workerInfo == null) {
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, "Leader cannot be determined");
        }
        URI redirect = UriBuilder.fromUri((URI)uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).replacePath(leaderPath).replaceQueryParam("workerId", new Object[]{workerId}).build(new Object[0]);
        return redirect;
    }
}

