/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker.rest.api;

import java.net.URI;
import java.util.Collection;
import java.util.LinkedList;
import java.util.function.Supplier;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.RestException;
import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FunctionsImpl
extends ComponentImpl {
    private static final Logger log = LoggerFactory.getLogger(FunctionsImpl.class);

    public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
        super(workerServiceSupplier, Utils.ComponentType.FUNCTION);
    }

    public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData getFunctionInstanceStatus(String tenant, String namespace, String componentName, String instanceId, URI uri) {
        FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData functionInstanceStatusData;
        this.componentInstanceStatusRequestValidate(tenant, namespace, componentName, Integer.parseInt(instanceId));
        try {
            functionInstanceStatusData = (FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData)new GetFunctionStatus().getComponentInstanceStatus(tenant, namespace, componentName, Integer.parseInt(instanceId), uri);
        }
        catch (WebApplicationException we) {
            throw we;
        }
        catch (Exception e) {
            log.error("{}/{}/{} Got Exception Getting Status", new Object[]{tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        return functionInstanceStatusData;
    }

    public FunctionStatus getFunctionStatus(String tenant, String namespace, String componentName, URI uri) {
        FunctionStatus functionStatus;
        this.componentStatusRequestValidate(tenant, namespace, componentName);
        try {
            functionStatus = (FunctionStatus)new GetFunctionStatus().getComponentStatus(tenant, namespace, componentName, uri);
        }
        catch (WebApplicationException we) {
            throw we;
        }
        catch (Exception e) {
            log.error("{}/{}/{} Got Exception Getting Status", new Object[]{tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        return functionStatus;
    }

    private class GetFunctionStatus
    extends ComponentImpl.GetStatus<FunctionStatus, FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData> {
        private GetFunctionStatus() {
            super(FunctionsImpl.this);
        }

        @Override
        public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData notScheduledInstance() {
            FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData functionInstanceStatusData = new FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
            functionInstanceStatusData.setRunning(false);
            functionInstanceStatusData.setError("Function has not been scheduled");
            return functionInstanceStatusData;
        }

        @Override
        public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData fromFunctionStatusProto(InstanceCommunication.FunctionStatus status, String assignedWorkerId) {
            ExceptionInformation exceptionInformation;
            FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData functionInstanceStatusData = new FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
            functionInstanceStatusData.setRunning(status.getRunning());
            functionInstanceStatusData.setError(status.getFailureException());
            functionInstanceStatusData.setNumRestarts(status.getNumRestarts());
            functionInstanceStatusData.setNumReceived(status.getNumReceived());
            functionInstanceStatusData.setNumSuccessfullyProcessed(status.getNumSuccessfullyProcessed());
            functionInstanceStatusData.setNumUserExceptions(status.getNumUserExceptions());
            LinkedList<ExceptionInformation> userExceptionInformationList = new LinkedList<ExceptionInformation>();
            for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestUserExceptionsList()) {
                ExceptionInformation exceptionInformation2 = new ExceptionInformation();
                exceptionInformation2.setTimestampMs(exceptionEntry.getMsSinceEpoch());
                exceptionInformation2.setExceptionString(exceptionEntry.getExceptionString());
                userExceptionInformationList.add(exceptionInformation2);
            }
            functionInstanceStatusData.setLatestUserExceptions(userExceptionInformationList);
            functionInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions() + status.getNumSourceExceptions() + status.getNumSinkExceptions());
            LinkedList<ExceptionInformation> systemExceptionInformationList = new LinkedList<ExceptionInformation>();
            for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSystemExceptionsList()) {
                exceptionInformation = new ExceptionInformation();
                exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
                exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
                systemExceptionInformationList.add(exceptionInformation);
            }
            for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSourceExceptionsList()) {
                exceptionInformation = new ExceptionInformation();
                exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
                exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
                systemExceptionInformationList.add(exceptionInformation);
            }
            for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSinkExceptionsList()) {
                exceptionInformation = new ExceptionInformation();
                exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
                exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
                systemExceptionInformationList.add(exceptionInformation);
            }
            functionInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
            functionInstanceStatusData.setAverageLatency(status.getAverageLatency());
            functionInstanceStatusData.setLastInvocationTime(status.getLastInvocationTime());
            functionInstanceStatusData.setWorkerId(assignedWorkerId);
            return functionInstanceStatusData;
        }

        @Override
        public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData notRunning(String assignedWorkerId, String error) {
            FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData functionInstanceStatusData = new FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
            functionInstanceStatusData.setRunning(false);
            if (error != null) {
                functionInstanceStatusData.setError(error);
            }
            functionInstanceStatusData.setWorkerId(assignedWorkerId);
            return functionInstanceStatusData;
        }

        @Override
        public FunctionStatus getStatus(String tenant, String namespace, String name, Collection<Function.Assignment> assignments, URI uri) throws PulsarAdminException {
            FunctionStatus functionStatus = new FunctionStatus();
            for (Function.Assignment assignment : assignments) {
                boolean isOwner = FunctionsImpl.this.worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
                FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData functionInstanceStatusData = isOwner ? (FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData)this.getComponentInstanceStatus(tenant, namespace, name, assignment.getInstance().getInstanceId(), null) : FunctionsImpl.this.worker().getFunctionAdmin().functions().getFunctionStatus(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(), assignment.getInstance().getInstanceId());
                FunctionStatus.FunctionInstanceStatus instanceStatus = new FunctionStatus.FunctionInstanceStatus();
                instanceStatus.setInstanceId(assignment.getInstance().getInstanceId());
                instanceStatus.setStatus(functionInstanceStatusData);
                functionStatus.addInstance(instanceStatus);
            }
            functionStatus.setNumInstances(functionStatus.instances.size());
            functionStatus.getInstances().forEach(functionInstanceStatus -> {
                if (functionInstanceStatus.getStatus().isRunning()) {
                    ++functionStatus.numRunning;
                }
            });
            return functionStatus;
        }

        @Override
        public FunctionStatus getStatusExternal(String tenant, String namespace, String name, int parallelism) {
            FunctionStatus functionStatus = new FunctionStatus();
            for (int i = 0; i < parallelism; ++i) {
                FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData functionInstanceStatusData = (FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData)this.getComponentInstanceStatus(tenant, namespace, name, i, null);
                FunctionStatus.FunctionInstanceStatus functionInstanceStatus2 = new FunctionStatus.FunctionInstanceStatus();
                functionInstanceStatus2.setInstanceId(i);
                functionInstanceStatus2.setStatus(functionInstanceStatusData);
                functionStatus.addInstance(functionInstanceStatus2);
            }
            functionStatus.setNumInstances(functionStatus.instances.size());
            functionStatus.getInstances().forEach(functionInstanceStatus -> {
                if (functionInstanceStatus.getStatus().isRunning()) {
                    ++functionStatus.numRunning;
                }
            });
            return functionStatus;
        }

        @Override
        public FunctionStatus emptyStatus(int parallelism) {
            FunctionStatus functionStatus = new FunctionStatus();
            functionStatus.setNumInstances(parallelism);
            functionStatus.setNumRunning(0);
            for (int i = 0; i < parallelism; ++i) {
                FunctionStatus.FunctionInstanceStatus functionInstanceStatus = new FunctionStatus.FunctionInstanceStatus();
                functionInstanceStatus.setInstanceId(i);
                FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData functionInstanceStatusData = new FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
                functionInstanceStatusData.setRunning(false);
                functionInstanceStatusData.setError("Function has not been scheduled");
                functionInstanceStatus.setStatus(functionInstanceStatusData);
                functionStatus.addInstance(functionInstanceStatus);
            }
            return functionStatus;
        }
    }
}

