/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.workflow.stream;

import java.util.Collections;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.thirdparty.hive.CreateHiveSinkForStreamListener;
import org.apache.inlong.manager.service.thirdparty.mq.CreatePulsarGroupForStreamTaskListener;
import org.apache.inlong.manager.service.thirdparty.mq.CreatePulsarTopicForStreamTaskListener;
import org.apache.inlong.manager.service.thirdparty.sort.PushSortConfigListener;
import org.apache.inlong.manager.service.workflow.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
import org.apache.inlong.manager.service.workflow.group.listener.InitGroupListener;
import org.apache.inlong.manager.service.workflow.stream.StreamCompleteProcessListener;
import org.apache.inlong.manager.service.workflow.stream.StreamFailedProcessListener;
import org.apache.inlong.manager.workflow.definition.Element;
import org.apache.inlong.manager.workflow.definition.EndEvent;
import org.apache.inlong.manager.workflow.definition.ServiceTask;
import org.apache.inlong.manager.workflow.definition.StartEvent;
import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
import org.apache.inlong.manager.workflow.definition.WorkflowTask;
import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
import org.apache.inlong.manager.workflow.event.task.TaskEventListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class CreateStreamWorkflowDefinition
implements WorkflowDefinition {
    private static final Logger log = LoggerFactory.getLogger(CreateStreamWorkflowDefinition.class);
    @Autowired
    private StreamSinkService sinkService;
    @Autowired
    private InitGroupListener initGroupListener;
    @Autowired
    private StreamFailedProcessListener streamFailedProcessListener;
    @Autowired
    private StreamCompleteProcessListener streamCompleteProcessListener;
    @Autowired
    private CreateHiveSinkForStreamListener createHiveTableListener;
    @Autowired
    private PushSortConfigListener pushSortConfigListener;
    @Autowired
    private CreatePulsarTopicForStreamTaskListener createPulsarTopicTaskListener;
    @Autowired
    private CreatePulsarGroupForStreamTaskListener createPulsarGroupTaskListener;

    @Override
    public WorkflowProcess defineProcess() {
        WorkflowProcess process = new WorkflowProcess();
        process.addListener((ProcessEventListener)this.initGroupListener);
        process.addListener((ProcessEventListener)this.streamFailedProcessListener);
        process.addListener((ProcessEventListener)this.streamCompleteProcessListener);
        process.setType("Inlong stream access resource creation");
        process.setName(this.getProcessName().name());
        process.setDisplayName(this.getProcessName().getDisplayName());
        process.setFormClass(GroupResourceProcessForm.class);
        process.setVersion(1);
        process.setHidden(Integer.valueOf(1));
        StartEvent startEvent = new StartEvent();
        process.setStartEvent(startEvent);
        EndEvent endEvent = new EndEvent();
        process.setEndEvent(endEvent);
        ServiceTask createPulsarTopicTask = new ServiceTask();
        createPulsarTopicTask.setSkipResolver(c -> {
            GroupResourceProcessForm form = (GroupResourceProcessForm)c.getProcessForm();
            MQType mqType = MQType.forType((String)form.getGroupInfo().getMiddlewareType());
            if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
                return false;
            }
            log.warn("no need to create pulsar topic for groupId={}, streamId={}, as the middlewareType={}", new Object[]{form.getInlongGroupId(), form.getInlongStreamId(), mqType});
            return true;
        });
        createPulsarTopicTask.setName("createPulsarTopic");
        createPulsarTopicTask.setDisplayName("Stream-CreatePulsarTopic");
        createPulsarTopicTask.addListener((TaskEventListener)this.createPulsarTopicTaskListener);
        process.addTask((WorkflowTask)createPulsarTopicTask);
        ServiceTask createPulsarSubscriptionGroupTask = new ServiceTask();
        createPulsarSubscriptionGroupTask.setSkipResolver(c -> {
            GroupResourceProcessForm form = (GroupResourceProcessForm)c.getProcessForm();
            MQType mqType = MQType.forType((String)form.getGroupInfo().getMiddlewareType());
            if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
                return false;
            }
            log.warn("no need to create pulsar subscription for groupId={}, streamId={}, as the middlewareType={}", new Object[]{form.getInlongGroupId(), form.getInlongStreamId(), mqType});
            return true;
        });
        createPulsarSubscriptionGroupTask.setName("createPulsarSubscription");
        createPulsarSubscriptionGroupTask.setDisplayName("Stream-CreatePulsarSubscription");
        createPulsarSubscriptionGroupTask.addListener((TaskEventListener)this.createPulsarGroupTaskListener);
        process.addTask((WorkflowTask)createPulsarSubscriptionGroupTask);
        ServiceTask createHiveTableTask = new ServiceTask();
        createHiveTableTask.setSkipResolver(c -> {
            String streamId;
            GroupResourceProcessForm form = (GroupResourceProcessForm)c.getProcessForm();
            String groupId = form.getInlongGroupId();
            List<String> dsForHive = this.sinkService.getExistsStreamIdList(groupId, "HIVE", Collections.singletonList(streamId = form.getInlongStreamId()));
            if (CollectionUtils.isEmpty(dsForHive)) {
                log.warn("inlong group [{}] adn inlong stream [{}] does not have sink, skip create hive table", (Object)groupId, (Object)streamId);
                return true;
            }
            return false;
        });
        createHiveTableTask.setName("createHiveTable");
        createHiveTableTask.setDisplayName("Stream-CreateHiveTable");
        createHiveTableTask.addListener((TaskEventListener)this.createHiveTableListener);
        process.addTask((WorkflowTask)createHiveTableTask);
        ServiceTask pushSortConfig = new ServiceTask();
        pushSortConfig.setName("pushSortConfig");
        pushSortConfig.setDisplayName("Stream-PushSortConfig");
        pushSortConfig.addListener((TaskEventListener)this.pushSortConfigListener);
        process.addTask((WorkflowTask)pushSortConfig);
        startEvent.addNext((Element)createPulsarTopicTask);
        createPulsarTopicTask.addNext((Element)createPulsarSubscriptionGroupTask);
        createPulsarSubscriptionGroupTask.addNext((Element)createHiveTableTask);
        createHiveTableTask.addNext((Element)pushSortConfig);
        pushSortConfig.addNext((Element)endEvent);
        return process;
    }

    @Override
    public ProcessName getProcessName() {
        return ProcessName.CREATE_STREAM_RESOURCE;
    }
}

