package org.apache.inlong.manager.service.stream;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.common.enums.ProcessStatus;
import org.apache.inlong.manager.common.enums.StreamStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.threadPool.VisiableThreadPoolTaskExecutor;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.user.LoginUserUtils;
import org.apache.inlong.manager.pojo.user.UserInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.workflow.WorkflowService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/stream/InlongStreamProcessService.class */
public class InlongStreamProcessService {
    private static final Logger log = LoggerFactory.getLogger(InlongStreamProcessService.class);
    private static final ExecutorService EXECUTOR_SERVICE = new VisiableThreadPoolTaskExecutor(10, 20, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(10000), new ThreadFactoryBuilder().setNameFormat("inlong-stream-process-%s").build(), new ThreadPoolExecutor.CallerRunsPolicy());

    @Autowired
    private InlongGroupService groupService;

    @Autowired
    private InlongStreamService streamService;

    @Autowired
    private WorkflowService workflowService;

    public boolean startProcess(String str, String str2, String str3, boolean z) {
        log.info("begin to start stream process for groupId={} streamId={}", str, str2);
        InlongGroupInfo inlongGroupInfo = this.groupService.get(str);
        Preconditions.expectNotNull(inlongGroupInfo, ErrorCodeEnum.GROUP_NOT_FOUND.getMessage());
        GroupStatus forCode = GroupStatus.forCode(inlongGroupInfo.getStatus().intValue());
        if (forCode != GroupStatus.CONFIG_SUCCESSFUL) {
            throw new BusinessException(String.format("group status=%s not support start stream for groupId=%s", forCode, str));
        }
        InlongStreamInfo inlongStreamInfo = this.streamService.get(str, str2);
        Preconditions.expectNotNull(inlongStreamInfo, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
        StreamStatus forCode2 = StreamStatus.forCode(inlongStreamInfo.getStatus().intValue());
        if (forCode2 == StreamStatus.CONFIG_ING) {
            log.warn("stream status={}, not need restart for groupId={} streamId={}", new Object[]{forCode2, str, str2});
            return true;
        }
        if (StreamStatus.notAllowedUpdate(forCode2)) {
            String format = String.format("stream status=%s not support start stream for groupId=%s streamId=%s", forCode2, str, str2);
            log.error(format);
            throw new BusinessException(format);
        }
        ProcessForm processForm = StreamResourceProcessForm.getProcessForm(inlongGroupInfo, inlongStreamInfo, GroupOperateType.INIT);
        ProcessName processName = ProcessName.CREATE_STREAM_RESOURCE;
        if (z) {
            return this.workflowService.start(processName, str3, processForm).getProcessInfo().getStatus() == ProcessStatus.COMPLETED;
        }
        UserInfo loginUser = LoginUserUtils.getLoginUser();
        EXECUTOR_SERVICE.execute(() -> {
            this.workflowService.startAsync(processName, loginUser, processForm);
        });
        return true;
    }

    public boolean suspendProcess(String str, String str2, String str3, boolean z) {
        log.info("begin to suspend stream process for groupId={} streamId={}", str, str2);
        InlongGroupInfo inlongGroupInfo = this.groupService.get(str);
        Preconditions.expectNotNull(inlongGroupInfo, ErrorCodeEnum.GROUP_NOT_FOUND.getMessage());
        GroupStatus forCode = GroupStatus.forCode(inlongGroupInfo.getStatus().intValue());
        if (!GroupStatus.allowedSuspend(forCode)) {
            throw new BusinessException(String.format("group status=%s not support suspend stream for groupId=%s", forCode, str));
        }
        InlongStreamInfo inlongStreamInfo = this.streamService.get(str, str2);
        Preconditions.expectNotNull(inlongStreamInfo, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
        StreamStatus forCode2 = StreamStatus.forCode(inlongStreamInfo.getStatus().intValue());
        if (forCode2 == StreamStatus.SUSPENDED || forCode2 == StreamStatus.SUSPENDING) {
            log.warn("groupId={}, streamId={} is already in {}", new Object[]{str, str2, forCode2});
            return true;
        }
        if (forCode2 != StreamStatus.CONFIG_SUCCESSFUL && forCode2 != StreamStatus.RESTARTED) {
            throw new BusinessException(String.format("stream status=%s not support suspend stream for groupId=%s streamId=%s", forCode2, str, str2));
        }
        ProcessForm processForm = StreamResourceProcessForm.getProcessForm(inlongGroupInfo, inlongStreamInfo, GroupOperateType.SUSPEND);
        ProcessName processName = ProcessName.SUSPEND_STREAM_RESOURCE;
        if (z) {
            return this.workflowService.start(processName, str3, processForm).getProcessInfo().getStatus() == ProcessStatus.COMPLETED;
        }
        UserInfo loginUser = LoginUserUtils.getLoginUser();
        EXECUTOR_SERVICE.execute(() -> {
            this.workflowService.startAsync(processName, loginUser, processForm);
        });
        return true;
    }

    public boolean restartProcess(String str, String str2, String str3, boolean z) {
        log.info("begin to restart stream process for groupId={} streamId={}", str, str2);
        InlongGroupInfo inlongGroupInfo = this.groupService.get(str);
        Preconditions.expectNotNull(inlongGroupInfo, ErrorCodeEnum.GROUP_NOT_FOUND.getMessage());
        GroupStatus forCode = GroupStatus.forCode(inlongGroupInfo.getStatus().intValue());
        if (forCode != GroupStatus.CONFIG_SUCCESSFUL) {
            throw new BusinessException(String.format("group status=%s not support restart stream for groupId=%s", forCode, str));
        }
        InlongStreamInfo inlongStreamInfo = this.streamService.get(str, str2);
        Preconditions.expectNotNull(inlongStreamInfo, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
        StreamStatus forCode2 = StreamStatus.forCode(inlongStreamInfo.getStatus().intValue());
        if (forCode2 == StreamStatus.RESTARTED || forCode2 == StreamStatus.RESTARTING) {
            log.warn("inlong stream was already in {} for groupId={}, streamId={}", new Object[]{forCode2, str, str2});
            return true;
        }
        if (forCode2 != StreamStatus.SUSPENDED) {
            throw new BusinessException(String.format("stream status=%s not support restart stream for groupId=%s streamId=%s", forCode2, str, str2));
        }
        ProcessForm processForm = StreamResourceProcessForm.getProcessForm(inlongGroupInfo, inlongStreamInfo, GroupOperateType.RESTART);
        ProcessName processName = ProcessName.RESTART_STREAM_RESOURCE;
        if (z) {
            return this.workflowService.start(processName, str3, processForm).getProcessInfo().getStatus() == ProcessStatus.COMPLETED;
        }
        UserInfo loginUser = LoginUserUtils.getLoginUser();
        EXECUTOR_SERVICE.execute(() -> {
            this.workflowService.startAsync(processName, loginUser, processForm);
        });
        return true;
    }

    public boolean deleteProcess(String str, String str2, String str3, boolean z) {
        log.debug("begin to delete stream process for groupId={} streamId={}", str, str2);
        InlongGroupInfo inlongGroupInfo = this.groupService.get(str);
        if (inlongGroupInfo == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND, ErrorCodeEnum.GROUP_NOT_FOUND.getMessage() + " : " + str);
        }
        GroupStatus forCode = GroupStatus.forCode(inlongGroupInfo.getStatus().intValue());
        if (GroupStatus.notAllowedTransition(forCode, GroupStatus.CONFIG_DELETING)) {
            throw new BusinessException(ErrorCodeEnum.GROUP_DELETE_NOT_ALLOWED, String.format("group status=%s not support delete stream for groupId=%s", forCode, str));
        }
        InlongStreamInfo inlongStreamInfo = this.streamService.get(str, str2);
        Preconditions.expectNotNull(inlongStreamInfo, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
        StreamStatus forCode2 = StreamStatus.forCode(inlongStreamInfo.getStatus().intValue());
        if (forCode2 == StreamStatus.DELETED || forCode2 == StreamStatus.DELETING) {
            log.debug("groupId={}, streamId={} is already in {}", new Object[]{str, str2, forCode2});
            return true;
        }
        if (StreamStatus.notAllowedDelete(forCode2)) {
            throw new BusinessException(ErrorCodeEnum.STREAM_OPT_NOT_ALLOWED, String.format("stream status=%s not support delete stream for groupId=%s streamId=%s", forCode2, str, str2));
        }
        ProcessForm processForm = StreamResourceProcessForm.getProcessForm(inlongGroupInfo, inlongStreamInfo, GroupOperateType.DELETE);
        ProcessName processName = ProcessName.DELETE_STREAM_RESOURCE;
        if (z) {
            if (this.workflowService.start(processName, str3, processForm).getProcessInfo().getStatus() == ProcessStatus.COMPLETED) {
                return this.streamService.delete(str, str2, str3).booleanValue();
            }
            return false;
        }
        UserInfo loginUser = LoginUserUtils.getLoginUser();
        EXECUTOR_SERVICE.execute(() -> {
            LoginUserUtils.setUserLoginInfo(loginUser);
            if (this.workflowService.start(processName, str3, processForm).getProcessInfo().getStatus() == ProcessStatus.COMPLETED) {
                this.streamService.delete(str, str2, str3);
            }
            LoginUserUtils.removeUserLoginInfo();
        });
        return true;
    }
}
