/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.core.operation;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.ProcessStatus;
import org.apache.inlong.manager.common.enums.StreamStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
import org.apache.inlong.manager.service.core.InlongStreamService;
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.workflow.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class InlongStreamProcessOperation {
    private static final Logger log = LoggerFactory.getLogger(InlongStreamProcessOperation.class);
    private final ExecutorService executorService = new ThreadPoolExecutor(20, 40, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder().setNameFormat("inlong-stream-process-%s").build(), new ThreadPoolExecutor.CallerRunsPolicy());
    @Autowired
    private InlongGroupService groupService;
    @Autowired
    private InlongStreamService streamService;
    @Autowired
    private WorkflowService workflowService;

    public boolean startProcess(String groupId, String streamId, String operator, boolean sync) {
        log.info("StartProcess for groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        InlongGroupInfo groupInfo = this.groupService.get(groupId);
        if (groupInfo == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        GroupStatus groupStatus = GroupStatus.forCode((int)groupInfo.getStatus());
        if (groupStatus != GroupStatus.CONFIG_SUCCESSFUL && groupStatus != GroupStatus.RESTARTED) {
            throw new BusinessException(String.format("GroupId=%s, status=%s not correct for stream start", groupId, groupStatus));
        }
        InlongStreamInfo streamInfo = this.streamService.get(groupId, streamId);
        if (streamInfo == null) {
            throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
        }
        StreamStatus status = StreamStatus.forCode((int)streamInfo.getStatus());
        if (status == StreamStatus.CONFIG_ING) {
            log.warn("GroupId={}, StreamId={} is already in {}", new Object[]{groupId, streamId, status});
            return true;
        }
        if (status != StreamStatus.NEW && status != StreamStatus.CONFIG_FAILED && status != StreamStatus.CONFIG_SUCCESSFUL) {
            throw new BusinessException(String.format("GroupId=%s, StreamId=%s, status=%s not correct for stream start", groupId, streamId, status));
        }
        StreamResourceProcessForm processForm = this.genStreamProcessForm(groupInfo, streamInfo, GroupOperateType.INIT);
        ProcessName processName = ProcessName.CREATE_STREAM_RESOURCE;
        if (sync) {
            WorkflowResult workflowResult = this.workflowService.start(processName, operator, (ProcessForm)processForm);
            ProcessStatus processStatus = workflowResult.getProcessInfo().getStatus();
            return processStatus == ProcessStatus.COMPLETED;
        }
        this.executorService.execute(() -> this.workflowService.start(processName, operator, (ProcessForm)processForm));
        return true;
    }

    public boolean suspendProcess(String groupId, String streamId, String operator, boolean sync) {
        log.info("SuspendProcess for groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        InlongGroupInfo groupInfo = this.groupService.get(groupId);
        if (groupInfo == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        GroupStatus groupStatus = GroupStatus.forCode((int)groupInfo.getStatus());
        if (groupStatus != GroupStatus.CONFIG_SUCCESSFUL && groupStatus != GroupStatus.RESTARTED && groupStatus != GroupStatus.SUSPENDED) {
            throw new BusinessException(String.format("GroupId=%s, status=%s not correct for stream suspend", groupId, groupStatus));
        }
        InlongStreamInfo streamInfo = this.streamService.get(groupId, streamId);
        if (streamInfo == null) {
            throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
        }
        StreamStatus status = StreamStatus.forCode((int)streamInfo.getStatus());
        if (status == StreamStatus.SUSPENDED || status == StreamStatus.SUSPENDING) {
            log.warn("GroupId={}, StreamId={} is already in {}", new Object[]{groupId, streamId, status});
            return true;
        }
        if (status != StreamStatus.CONFIG_SUCCESSFUL && status != StreamStatus.RESTARTED) {
            throw new BusinessException(String.format("GroupId=%s, StreamId=%s, status=%s not correct for stream suspend", groupId, streamId, status));
        }
        StreamResourceProcessForm processForm = this.genStreamProcessForm(groupInfo, streamInfo, GroupOperateType.SUSPEND);
        ProcessName processName = ProcessName.SUSPEND_STREAM_RESOURCE;
        if (sync) {
            WorkflowResult workflowResult = this.workflowService.start(processName, operator, (ProcessForm)processForm);
            ProcessStatus processStatus = workflowResult.getProcessInfo().getStatus();
            return processStatus == ProcessStatus.COMPLETED;
        }
        this.executorService.execute(() -> this.workflowService.start(processName, operator, (ProcessForm)processForm));
        return true;
    }

    public boolean restartProcess(String groupId, String streamId, String operator, boolean sync) {
        log.info("RestartProcess for groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        InlongGroupInfo groupInfo = this.groupService.get(groupId);
        if (groupInfo == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        GroupStatus groupStatus = GroupStatus.forCode((int)groupInfo.getStatus());
        if (groupStatus != GroupStatus.CONFIG_SUCCESSFUL && groupStatus != GroupStatus.RESTARTED) {
            throw new BusinessException(String.format("GroupId=%s, status=%s not correct for stream restart", groupId, groupStatus));
        }
        InlongStreamInfo streamInfo = this.streamService.get(groupId, streamId);
        if (streamInfo == null) {
            throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
        }
        StreamStatus status = StreamStatus.forCode((int)streamInfo.getStatus());
        if (status == StreamStatus.RESTARTED || status == StreamStatus.RESTARTING) {
            log.warn("GroupId={}, StreamId={} is already in {}", new Object[]{groupId, streamId, status});
            return true;
        }
        if (status != StreamStatus.SUSPENDED) {
            throw new BusinessException(String.format("GroupId=%s, StreamId=%s, status=%s not correct for stream restart", groupId, streamId, status));
        }
        StreamResourceProcessForm processForm = this.genStreamProcessForm(groupInfo, streamInfo, GroupOperateType.RESTART);
        ProcessName processName = ProcessName.RESTART_STREAM_RESOURCE;
        if (sync) {
            WorkflowResult workflowResult = this.workflowService.start(processName, operator, (ProcessForm)processForm);
            ProcessStatus processStatus = workflowResult.getProcessInfo().getStatus();
            return processStatus == ProcessStatus.COMPLETED;
        }
        this.executorService.execute(() -> this.workflowService.start(processName, operator, (ProcessForm)processForm));
        return true;
    }

    public boolean deleteProcess(String groupId, String streamId, String operator, boolean sync) {
        log.info("DeleteProcess for groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        InlongGroupInfo groupInfo = this.groupService.get(groupId);
        if (groupInfo == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        GroupStatus groupStatus = GroupStatus.forCode((int)groupInfo.getStatus());
        if (groupStatus != GroupStatus.CONFIG_SUCCESSFUL && groupStatus != GroupStatus.RESTARTED && groupStatus != GroupStatus.SUSPENDED && groupStatus != GroupStatus.DELETING) {
            throw new BusinessException(String.format("GroupId=%s, status=%s not correct for stream delete", groupId, groupStatus));
        }
        InlongStreamInfo streamInfo = this.streamService.get(groupId, streamId);
        if (streamInfo == null) {
            throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
        }
        StreamStatus status = StreamStatus.forCode((int)streamInfo.getStatus());
        if (status == StreamStatus.DELETED || status == StreamStatus.DELETING) {
            log.warn("GroupId={}, StreamId={} is already in {}", new Object[]{groupId, streamId, status});
            return true;
        }
        if (status == StreamStatus.CONFIG_ING || status == StreamStatus.RESTARTING || status == StreamStatus.SUSPENDING) {
            throw new BusinessException(String.format("GroupId=%s, StreamId=%s, status=%s not correct for stream delete", groupId, streamId, status));
        }
        StreamResourceProcessForm processForm = this.genStreamProcessForm(groupInfo, streamInfo, GroupOperateType.DELETE);
        ProcessName processName = ProcessName.DELETE_STREAM_RESOURCE;
        if (sync) {
            WorkflowResult workflowResult = this.workflowService.start(processName, operator, (ProcessForm)processForm);
            ProcessStatus processStatus = workflowResult.getProcessInfo().getStatus();
            if (processStatus == ProcessStatus.COMPLETED) {
                return this.streamService.delete(groupId, streamId, operator);
            }
            return false;
        }
        this.executorService.execute(() -> {
            WorkflowResult workflowResult = this.workflowService.start(processName, operator, (ProcessForm)processForm);
            ProcessStatus processStatus = workflowResult.getProcessInfo().getStatus();
            if (processStatus == ProcessStatus.COMPLETED) {
                this.streamService.delete(groupId, streamId, operator);
            }
        });
        return true;
    }

    private StreamResourceProcessForm genStreamProcessForm(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, GroupOperateType operateType) {
        StreamResourceProcessForm processForm = new StreamResourceProcessForm();
        processForm.setGroupInfo(groupInfo);
        processForm.setStreamInfo(streamInfo);
        processForm.setGroupOperateType(operateType);
        return processForm;
    }
}

