/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker.rest.api;

import java.net.URI;
import java.util.Collection;
import java.util.LinkedList;
import java.util.function.Supplier;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.SinkStatus;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.RestException;
import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SinkImpl
extends ComponentImpl {
    private static final Logger log = LoggerFactory.getLogger(SinkImpl.class);

    public SinkImpl(Supplier<WorkerService> workerServiceSupplier) {
        super(workerServiceSupplier, Utils.ComponentType.SINK);
    }

    public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkInstanceStatus(String tenant, String namespace, String sinkName, String instanceId, URI uri) {
        SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData;
        this.componentInstanceStatusRequestValidate(tenant, namespace, sinkName, Integer.parseInt(instanceId));
        try {
            sinkInstanceStatusData = (SinkStatus.SinkInstanceStatus.SinkInstanceStatusData)new GetSinkStatus().getComponentInstanceStatus(tenant, namespace, sinkName, Integer.parseInt(instanceId), uri);
        }
        catch (WebApplicationException we) {
            throw we;
        }
        catch (Exception e) {
            log.error("{}/{}/{} Got Exception Getting Status", new Object[]{tenant, namespace, sinkName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        return sinkInstanceStatusData;
    }

    public SinkStatus getSinkStatus(String tenant, String namespace, String componentName, URI uri) {
        SinkStatus sinkStatus;
        this.componentStatusRequestValidate(tenant, namespace, componentName);
        try {
            sinkStatus = (SinkStatus)new GetSinkStatus().getComponentStatus(tenant, namespace, componentName, uri);
        }
        catch (WebApplicationException we) {
            throw we;
        }
        catch (Exception e) {
            log.error("{}/{}/{} Got Exception Getting Status", new Object[]{tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        return sinkStatus;
    }

    public SinkConfig getSinkInfo(String tenant, String namespace, String componentName) {
        if (!this.isWorkerServiceAvailable()) {
            this.throwUnavailableException();
        }
        try {
            this.validateGetFunctionRequestParams(tenant, namespace, componentName, this.componentType);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid get {} request @ /{}/{}/{}", new Object[]{this.componentType, tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.error("{} does not exist @ /{}/{}/{}", new Object[]{this.componentType, tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format(this.componentType + " %s doesn't exist", componentName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        if (!this.calculateSubjectType(functionMetaData).equals((Object)this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{tenant, namespace, componentName, this.componentType});
            throw new RestException(Response.Status.NOT_FOUND, String.format(this.componentType + " %s doesn't exist", componentName));
        }
        SinkConfig config = SinkConfigUtils.convertFromDetails((Function.FunctionDetails)functionMetaData.getFunctionDetails());
        return config;
    }

    private class GetSinkStatus
    extends ComponentImpl.GetStatus<SinkStatus, SinkStatus.SinkInstanceStatus.SinkInstanceStatusData> {
        private GetSinkStatus() {
            super(SinkImpl.this);
        }

        @Override
        public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData notScheduledInstance() {
            SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData = new SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
            sinkInstanceStatusData.setRunning(false);
            sinkInstanceStatusData.setError("Sink has not been scheduled");
            return sinkInstanceStatusData;
        }

        @Override
        public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData fromFunctionStatusProto(InstanceCommunication.FunctionStatus status, String assignedWorkerId) {
            ExceptionInformation exceptionInformation;
            SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData = new SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
            sinkInstanceStatusData.setRunning(status.getRunning());
            sinkInstanceStatusData.setError(status.getFailureException());
            sinkInstanceStatusData.setNumRestarts(status.getNumRestarts());
            sinkInstanceStatusData.setNumReadFromPulsar(status.getNumReceived());
            sinkInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions() + status.getNumUserExceptions() + status.getNumSourceExceptions());
            LinkedList<ExceptionInformation> systemExceptionInformationList = new LinkedList<ExceptionInformation>();
            for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestUserExceptionsList()) {
                exceptionInformation = new ExceptionInformation();
                exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
                exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
                systemExceptionInformationList.add(exceptionInformation);
            }
            for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSystemExceptionsList()) {
                exceptionInformation = new ExceptionInformation();
                exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
                exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
                systemExceptionInformationList.add(exceptionInformation);
            }
            for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSourceExceptionsList()) {
                exceptionInformation = new ExceptionInformation();
                exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
                exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
                systemExceptionInformationList.add(exceptionInformation);
            }
            sinkInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
            sinkInstanceStatusData.setNumSinkExceptions(status.getNumSinkExceptions());
            LinkedList<ExceptionInformation> sinkExceptionInformationList = new LinkedList<ExceptionInformation>();
            for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSinkExceptionsList()) {
                ExceptionInformation exceptionInformation2 = new ExceptionInformation();
                exceptionInformation2.setTimestampMs(exceptionEntry.getMsSinceEpoch());
                exceptionInformation2.setExceptionString(exceptionEntry.getExceptionString());
                sinkExceptionInformationList.add(exceptionInformation2);
            }
            sinkInstanceStatusData.setLatestSinkExceptions(sinkExceptionInformationList);
            sinkInstanceStatusData.setNumWrittenToSink(status.getNumSuccessfullyProcessed());
            sinkInstanceStatusData.setLastReceivedTime(status.getLastInvocationTime());
            sinkInstanceStatusData.setWorkerId(assignedWorkerId);
            return sinkInstanceStatusData;
        }

        @Override
        public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData notRunning(String assignedWorkerId, String error) {
            SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData = new SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
            sinkInstanceStatusData.setRunning(false);
            if (error != null) {
                sinkInstanceStatusData.setError(error);
            }
            sinkInstanceStatusData.setWorkerId(assignedWorkerId);
            return sinkInstanceStatusData;
        }

        @Override
        public SinkStatus getStatus(String tenant, String namespace, String name, Collection<Function.Assignment> assignments, URI uri) throws PulsarAdminException {
            SinkStatus sinkStatus = new SinkStatus();
            for (Function.Assignment assignment : assignments) {
                boolean isOwner = SinkImpl.this.worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
                SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData = isOwner ? (SinkStatus.SinkInstanceStatus.SinkInstanceStatusData)this.getComponentInstanceStatus(tenant, namespace, name, assignment.getInstance().getInstanceId(), null) : SinkImpl.this.worker().getFunctionAdmin().sink().getSinkStatus(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(), assignment.getInstance().getInstanceId());
                SinkStatus.SinkInstanceStatus instanceStatus = new SinkStatus.SinkInstanceStatus();
                instanceStatus.setInstanceId(assignment.getInstance().getInstanceId());
                instanceStatus.setStatus(sinkInstanceStatusData);
                sinkStatus.addInstance(instanceStatus);
            }
            sinkStatus.setNumInstances(sinkStatus.instances.size());
            sinkStatus.getInstances().forEach(sinkInstanceStatus -> {
                if (sinkInstanceStatus.getStatus().isRunning()) {
                    ++sinkStatus.numRunning;
                }
            });
            return sinkStatus;
        }

        @Override
        public SinkStatus getStatusExternal(String tenant, String namespace, String name, int parallelism) {
            SinkStatus sinkStatus = new SinkStatus();
            for (int i = 0; i < parallelism; ++i) {
                SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData = (SinkStatus.SinkInstanceStatus.SinkInstanceStatusData)this.getComponentInstanceStatus(tenant, namespace, name, i, null);
                SinkStatus.SinkInstanceStatus sinkInstanceStatus2 = new SinkStatus.SinkInstanceStatus();
                sinkInstanceStatus2.setInstanceId(i);
                sinkInstanceStatus2.setStatus(sinkInstanceStatusData);
                sinkStatus.addInstance(sinkInstanceStatus2);
            }
            sinkStatus.setNumInstances(sinkStatus.instances.size());
            sinkStatus.getInstances().forEach(sinkInstanceStatus -> {
                if (sinkInstanceStatus.getStatus().isRunning()) {
                    ++sinkStatus.numRunning;
                }
            });
            return sinkStatus;
        }

        @Override
        public SinkStatus emptyStatus(int parallelism) {
            SinkStatus sinkStatus = new SinkStatus();
            sinkStatus.setNumInstances(parallelism);
            sinkStatus.setNumRunning(0);
            for (int i = 0; i < parallelism; ++i) {
                SinkStatus.SinkInstanceStatus sinkInstanceStatus = new SinkStatus.SinkInstanceStatus();
                sinkInstanceStatus.setInstanceId(i);
                SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData = new SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
                sinkInstanceStatusData.setRunning(false);
                sinkInstanceStatusData.setError("Sink has not been scheduled");
                sinkInstanceStatus.setStatus(sinkInstanceStatusData);
                sinkStatus.addInstance(sinkInstanceStatus);
            }
            return sinkStatus;
        }
    }
}

