/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker.rest.api;

import java.net.URI;
import java.util.Collection;
import java.util.LinkedList;
import java.util.function.Supplier;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.SourceStatus;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.utils.ComponentTypeUtils;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.RestException;
import org.apache.pulsar.functions.worker.rest.RestUtils;
import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SourcesImpl
extends ComponentImpl {
    private static final Logger log = LoggerFactory.getLogger(SourcesImpl.class);

    public SourcesImpl(Supplier<WorkerService> workerServiceSupplier) {
        super(workerServiceSupplier, Function.FunctionDetails.ComponentType.SOURCE);
    }

    public SourceStatus getSourceStatus(String tenant, String namespace, String componentName, URI uri, String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) {
        SourceStatus sourceStatus;
        this.componentStatusRequestValidate(tenant, namespace, componentName, clientRole, clientAuthenticationDataHttps);
        try {
            sourceStatus = (SourceStatus)new GetSourceStatus().getComponentStatus(tenant, namespace, componentName, uri);
        }
        catch (WebApplicationException we) {
            throw we;
        }
        catch (Exception e) {
            log.error("{}/{}/{} Got Exception Getting Status", new Object[]{tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        return sourceStatus;
    }

    public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData getSourceInstanceStatus(String tenant, String namespace, String sourceName, String instanceId, URI uri, String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) {
        SourceStatus.SourceInstanceStatus.SourceInstanceStatusData sourceInstanceStatusData;
        this.componentInstanceStatusRequestValidate(tenant, namespace, sourceName, Integer.parseInt(instanceId), clientRole, clientAuthenticationDataHttps);
        try {
            sourceInstanceStatusData = (SourceStatus.SourceInstanceStatus.SourceInstanceStatusData)new GetSourceStatus().getComponentInstanceStatus(tenant, namespace, sourceName, Integer.parseInt(instanceId), uri);
        }
        catch (WebApplicationException we) {
            throw we;
        }
        catch (Exception e) {
            log.error("{}/{}/{} Got Exception Getting Status", new Object[]{tenant, namespace, sourceName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        return sourceInstanceStatusData;
    }

    public SourceConfig getSourceInfo(String tenant, String namespace, String componentName) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        try {
            this.validateGetFunctionRequestParams(tenant, namespace, componentName, this.componentType);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid get {} request @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.error("{} does not exist @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format(ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType) + " %s doesn't exist", componentName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        if (!InstanceUtils.calculateSubjectType((Function.FunctionDetails)functionMetaData.getFunctionDetails()).equals((Object)this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{tenant, namespace, componentName, ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType)});
            throw new RestException(Response.Status.NOT_FOUND, String.format(ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType) + " %s doesn't exist", componentName));
        }
        SourceConfig config = SourceConfigUtils.convertFromDetails((Function.FunctionDetails)functionMetaData.getFunctionDetails());
        return config;
    }

    private class GetSourceStatus
    extends ComponentImpl.GetStatus<SourceStatus, SourceStatus.SourceInstanceStatus.SourceInstanceStatusData> {
        private GetSourceStatus() {
            super(SourcesImpl.this);
        }

        @Override
        public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData notScheduledInstance() {
            SourceStatus.SourceInstanceStatus.SourceInstanceStatusData sourceInstanceStatusData = new SourceStatus.SourceInstanceStatus.SourceInstanceStatusData();
            sourceInstanceStatusData.setRunning(false);
            sourceInstanceStatusData.setError("Source has not been scheduled");
            return sourceInstanceStatusData;
        }

        @Override
        public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData fromFunctionStatusProto(InstanceCommunication.FunctionStatus status, String assignedWorkerId) {
            ExceptionInformation exceptionInformation;
            SourceStatus.SourceInstanceStatus.SourceInstanceStatusData sourceInstanceStatusData = new SourceStatus.SourceInstanceStatus.SourceInstanceStatusData();
            sourceInstanceStatusData.setRunning(status.getRunning());
            sourceInstanceStatusData.setError(status.getFailureException());
            sourceInstanceStatusData.setNumRestarts(status.getNumRestarts());
            sourceInstanceStatusData.setNumReceivedFromSource(status.getNumReceived());
            sourceInstanceStatusData.setNumSourceExceptions(status.getNumSourceExceptions());
            LinkedList<ExceptionInformation> sourceExceptionInformationList = new LinkedList<ExceptionInformation>();
            for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSourceExceptionsList()) {
                ExceptionInformation exceptionInformation2 = new ExceptionInformation();
                exceptionInformation2.setTimestampMs(exceptionEntry.getMsSinceEpoch());
                exceptionInformation2.setExceptionString(exceptionEntry.getExceptionString());
                sourceExceptionInformationList.add(exceptionInformation2);
            }
            sourceInstanceStatusData.setLatestSourceExceptions(sourceExceptionInformationList);
            sourceInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions() + status.getNumUserExceptions() + status.getNumSinkExceptions());
            LinkedList<ExceptionInformation> systemExceptionInformationList = new LinkedList<ExceptionInformation>();
            for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestUserExceptionsList()) {
                exceptionInformation = new ExceptionInformation();
                exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
                exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
                systemExceptionInformationList.add(exceptionInformation);
            }
            for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSystemExceptionsList()) {
                exceptionInformation = new ExceptionInformation();
                exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
                exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
                systemExceptionInformationList.add(exceptionInformation);
            }
            for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSinkExceptionsList()) {
                exceptionInformation = new ExceptionInformation();
                exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
                exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
                systemExceptionInformationList.add(exceptionInformation);
            }
            sourceInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
            sourceInstanceStatusData.setNumWritten(status.getNumSuccessfullyProcessed());
            sourceInstanceStatusData.setLastReceivedTime(status.getLastInvocationTime());
            sourceInstanceStatusData.setWorkerId(assignedWorkerId);
            return sourceInstanceStatusData;
        }

        @Override
        public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData notRunning(String assignedWorkerId, String error) {
            SourceStatus.SourceInstanceStatus.SourceInstanceStatusData sourceInstanceStatusData = new SourceStatus.SourceInstanceStatus.SourceInstanceStatusData();
            sourceInstanceStatusData.setRunning(false);
            if (error != null) {
                sourceInstanceStatusData.setError(error);
            }
            sourceInstanceStatusData.setWorkerId(assignedWorkerId);
            return sourceInstanceStatusData;
        }

        @Override
        public SourceStatus getStatus(String tenant, String namespace, String name, Collection<Function.Assignment> assignments, URI uri) throws PulsarAdminException {
            SourceStatus sourceStatus = new SourceStatus();
            for (Function.Assignment assignment : assignments) {
                boolean isOwner = SourcesImpl.this.worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
                SourceStatus.SourceInstanceStatus.SourceInstanceStatusData sourceInstanceStatusData = isOwner ? (SourceStatus.SourceInstanceStatus.SourceInstanceStatusData)this.getComponentInstanceStatus(tenant, namespace, name, assignment.getInstance().getInstanceId(), null) : SourcesImpl.this.worker().getFunctionAdmin().source().getSourceStatus(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(), assignment.getInstance().getInstanceId());
                SourceStatus.SourceInstanceStatus instanceStatus = new SourceStatus.SourceInstanceStatus();
                instanceStatus.setInstanceId(assignment.getInstance().getInstanceId());
                instanceStatus.setStatus(sourceInstanceStatusData);
                sourceStatus.addInstance(instanceStatus);
            }
            sourceStatus.setNumInstances(sourceStatus.instances.size());
            sourceStatus.getInstances().forEach(sourceInstanceStatus -> {
                if (sourceInstanceStatus.getStatus().isRunning()) {
                    ++sourceStatus.numRunning;
                }
            });
            return sourceStatus;
        }

        @Override
        public SourceStatus getStatusExternal(String tenant, String namespace, String name, int parallelism) {
            SourceStatus sinkStatus = new SourceStatus();
            for (int i = 0; i < parallelism; ++i) {
                SourceStatus.SourceInstanceStatus.SourceInstanceStatusData sourceInstanceStatusData = (SourceStatus.SourceInstanceStatus.SourceInstanceStatusData)this.getComponentInstanceStatus(tenant, namespace, name, i, null);
                SourceStatus.SourceInstanceStatus sourceInstanceStatus2 = new SourceStatus.SourceInstanceStatus();
                sourceInstanceStatus2.setInstanceId(i);
                sourceInstanceStatus2.setStatus(sourceInstanceStatusData);
                sinkStatus.addInstance(sourceInstanceStatus2);
            }
            sinkStatus.setNumInstances(sinkStatus.instances.size());
            sinkStatus.getInstances().forEach(sourceInstanceStatus -> {
                if (sourceInstanceStatus.getStatus().isRunning()) {
                    ++sinkStatus.numRunning;
                }
            });
            return sinkStatus;
        }

        @Override
        public SourceStatus emptyStatus(int parallelism) {
            SourceStatus sourceStatus = new SourceStatus();
            sourceStatus.setNumInstances(parallelism);
            sourceStatus.setNumRunning(0);
            for (int i = 0; i < parallelism; ++i) {
                SourceStatus.SourceInstanceStatus sourceInstanceStatus = new SourceStatus.SourceInstanceStatus();
                sourceInstanceStatus.setInstanceId(i);
                SourceStatus.SourceInstanceStatus.SourceInstanceStatusData sourceInstanceStatusData = new SourceStatus.SourceInstanceStatus.SourceInstanceStatusData();
                sourceInstanceStatusData.setRunning(false);
                sourceInstanceStatusData.setError("Source has not been scheduled");
                sourceInstanceStatus.setStatus(sourceInstanceStatusData);
                sourceStatus.addInstance(sourceInstanceStatus);
            }
            return sourceStatus;
        }
    }
}

