package org.apache.synapse.message.processor.impl;

import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.ServiceLoader;
import java.util.concurrent.Callable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.SynapseException;
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.message.MessageConsumer;
import org.apache.synapse.message.processor.MessageProcessorCleanupService;
import org.apache.synapse.message.processor.MessageProcessorConstants;
import org.apache.synapse.message.processor.impl.failover.FailoverForwardingService;
import org.apache.synapse.message.processor.impl.forwarder.ForwardingProcessorConstants;
import org.apache.synapse.message.processor.impl.forwarder.ForwardingService;
import org.apache.synapse.message.processor.impl.sampler.SamplingService;
import org.apache.synapse.message.senders.blocking.BlockingMsgSender;
import org.apache.synapse.registry.Registry;
import org.apache.synapse.task.SynapseTaskException;
import org.apache.synapse.task.Task;
import org.apache.synapse.task.TaskDescription;
import org.apache.synapse.task.TaskManager;
import org.apache.synapse.task.TaskManagerObserver;
import org.springframework.beans.PropertyAccessor;

/* loaded from: input_file:WEB-INF/lib/synapse-core-4.0.0-wso2v3.jar:org/apache/synapse/message/processor/impl/ScheduledMessageProcessor.class */
public abstract class ScheduledMessageProcessor extends AbstractMessageProcessor implements TaskManagerObserver {
    private static final Log logger = LogFactory.getLog(ScheduledMessageProcessor.class.getName());
    protected BlockingMsgSender sender;
    protected SynapseEnvironment synapseEnvironment;
    private Task task;
    private Registry registry;
    private static final String REG_PROCESSOR_BASE_PATH = "/repository/components/org.apache.synapse.message.processor/";
    private static final String MP_STATE = "MESSAGE_PROCESSOR_STATE";
    private static final String TASK_PREFIX = "MSMP_";
    private static final String SYMBOL_UNDERSCORE = "_";
    private static final String DEFAULT_TASK_SUFFIX = "0";
    protected long interval = 1000;
    protected String cronExpression = null;
    protected String[] nonRetryStatusCodes = null;
    private TaskManager taskManager = null;
    private int memberCount = 1;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/synapse-core-4.0.0-wso2v3.jar:org/apache/synapse/message/processor/impl/ScheduledMessageProcessor$ProcessorState.class */
    public enum ProcessorState {
        INITIAL,
        RUNNING,
        PAUSED
    }

    @Override // org.apache.synapse.message.processor.impl.AbstractMessageProcessor, org.apache.synapse.ManagedLifecycle
    public void init(SynapseEnvironment synapseEnvironment) {
        this.synapseEnvironment = synapseEnvironment;
        initMessageSender(this.parameters);
        this.registry = synapseEnvironment.getSynapseConfiguration().getRegistry();
        super.init(synapseEnvironment);
        if (this.taskManager == null) {
            this.taskManager = this.synapseEnvironment.getSynapseConfiguration().getTaskManager();
        }
        if (this.taskManager == null) {
            throw new SynapseException("Task Manager not defined in the configuration.");
        }
        if (this.taskManager.isInitialized()) {
            start();
        } else {
            this.taskManager.addObserver(this);
        }
    }

    public boolean getIsActivatedParamValue() {
        Object obj = this.parameters.get(MessageProcessorConstants.IS_ACTIVATED);
        boolean z = true;
        if (obj != null) {
            z = Boolean.parseBoolean(String.valueOf(obj));
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isProcessorStartAsDeactivated() {
        return getProcessorState() == ProcessorState.INITIAL ? !getIsActivatedParamValue() : getProcessorState() == ProcessorState.PAUSED;
    }

    @Override // org.apache.synapse.message.processor.MessageProcessor
    public boolean start() {
        for (int i = 0; i < this.memberCount; i++) {
            this.task = getTask();
            TaskDescription taskDescription = new TaskDescription();
            taskDescription.setName(TASK_PREFIX + this.name + "_" + i);
            taskDescription.setTaskGroup(MessageProcessorConstants.SCHEDULED_MESSAGE_PROCESSOR_GROUP);
            if (this.interval < 1000) {
                taskDescription.setInterval(1000L);
            } else {
                taskDescription.setInterval(this.interval);
            }
            taskDescription.setIntervalInMs(true);
            taskDescription.addResource(TaskDescription.INSTANCE, this.task);
            taskDescription.addResource("ClassName", this.task.getClass().getName());
            if (this.cronExpression != null) {
                taskDescription.setCronExpression(this.cronExpression);
            }
            this.taskManager.schedule(taskDescription);
        }
        logger.info("Started message processor. [" + getName() + "].");
        if (!isProcessorStartAsDeactivated()) {
            return true;
        }
        deactivate();
        return true;
    }

    @Override // org.apache.synapse.message.processor.MessageProcessor
    public boolean isDeactivated() {
        return this.taskManager.isTaskDeactivated(TASK_PREFIX + this.name + "_0");
    }

    @Override // org.apache.synapse.message.processor.impl.AbstractMessageProcessor, org.apache.synapse.message.processor.MessageProcessor
    public void setParameters(Map<String, Object> map) {
        super.setParameters(map);
        if (map == null || map.isEmpty()) {
            return;
        }
        Object obj = map.get(MessageProcessorConstants.CRON_EXPRESSION);
        if (obj != null) {
            this.cronExpression = obj.toString();
        }
        if (map.get(MessageProcessorConstants.INTERVAL) != null) {
            this.interval = Integer.parseInt(r0.toString());
        }
        Object obj2 = map.get(MessageProcessorConstants.MEMBER_COUNT);
        if (obj2 != null) {
            this.memberCount = Integer.parseInt(obj2.toString());
        }
        Object obj3 = map.get(MessageProcessorConstants.IS_ACTIVATED);
        if (obj3 != null) {
            setActivated(Boolean.valueOf(obj3.toString()).booleanValue());
        }
        Object obj4 = map.get(ForwardingProcessorConstants.NON_RETRY_STATUS_CODES);
        if (obj4 != null) {
            this.nonRetryStatusCodes = obj4.toString().split(",");
        }
    }

    @Override // org.apache.synapse.message.processor.MessageProcessor
    public boolean stop() {
        boolean z = false;
        try {
            z = stopTasks(this.memberCount);
        } catch (SynapseTaskException e) {
            logger.error("Cannot stop tasks. Error: " + e.getLocalizedMessage(), e);
        }
        return z;
    }

    private boolean stopTasks(int i) {
        if (this.taskManager == null || !this.taskManager.isInitialized()) {
            return false;
        }
        for (int i2 = 0; i2 < i; i2++) {
            if (this.taskManager.isTaskExist(TASK_PREFIX + this.name + "_" + i2) && this.taskManager.isTaskRunning(TASK_PREFIX + this.name + "_" + i2)) {
                this.taskManager.pause(TASK_PREFIX + this.name + "_" + i2);
            }
            if (logger.isDebugEnabled()) {
                logger.debug("ShuttingDown Message Processor Scheduler : " + this.taskManager.getName());
            }
            this.taskManager.delete(TASK_PREFIX + this.name + "_" + i2 + "::" + MessageProcessorConstants.SCHEDULED_MESSAGE_PROCESSOR_GROUP);
        }
        if (!logger.isDebugEnabled()) {
            return true;
        }
        logger.debug("Stopped message processor [" + getName() + "].");
        return true;
    }

    @Override // org.apache.synapse.ManagedLifecycle
    public void destroy() {
        destroy(false);
    }

    @Override // org.apache.synapse.message.processor.impl.AbstractMessageProcessor
    public void destroy(boolean z) {
        destroy(z, false);
    }

    @Override // org.apache.synapse.message.processor.impl.AbstractMessageProcessor
    public void destroy(boolean z, boolean z2) {
        if (!z) {
            deleteMessageProcessorState();
        }
        if (z2 || !z) {
            stop();
        }
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            logger.error("The thread was interrupted while sleeping", e);
            Thread.currentThread().interrupt();
        }
        if (getMessageConsumer() == null || this.messageConsumers.isEmpty()) {
            logger.warn(PropertyAccessor.PROPERTY_KEY_PREFIX + getName() + "] Could not find the message consumer to cleanup.");
        } else {
            cleanupLocalResources();
            Iterator<MessageConsumer> it = this.messageConsumers.iterator();
            while (it.hasNext()) {
                it.next().setAlive(false);
            }
        }
        this.taskManager.sendClusterMessage(getMessageProcessorCleanupTask());
        if (logger.isDebugEnabled()) {
            logger.info("Successfully destroyed message processor [" + getName() + "].");
        }
    }

    @Override // org.apache.synapse.message.processor.MessageProcessor
    public boolean deactivate() {
        if (this.taskManager == null || !this.taskManager.isInitialized()) {
            return false;
        }
        try {
            if (logger.isDebugEnabled()) {
                logger.debug("Deactivating message processor [" + getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
            }
            pauseService();
            setMessageProcessorState(ProcessorState.PAUSED);
            logger.info("Successfully deactivated the message processor [" + getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
            return true;
        } finally {
            cleanupLocalResources();
            this.taskManager.sendClusterMessage(getMessageProcessorCleanupTask());
        }
    }

    @Override // org.apache.synapse.message.processor.MessageProcessor
    public boolean activate() {
        if (this.taskManager == null || !isDeactivated()) {
            return false;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Starting Message Processor Scheduler : " + this.taskManager.getName());
        }
        resumeService();
        setMessageProcessorState(ProcessorState.RUNNING);
        logger.info("Successfully re-activated the message processor [" + getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
        return true;
    }

    @Override // org.apache.synapse.message.processor.MessageProcessor
    public void pauseService() {
        terminate();
        for (int i = 0; i < this.memberCount; i++) {
            this.taskManager.pause(TASK_PREFIX + this.name + "_" + i);
        }
    }

    @Override // org.apache.synapse.message.processor.MessageProcessor
    public void resumeService() {
        for (int i = 0; i < this.memberCount; i++) {
            this.taskManager.resume(TASK_PREFIX + this.name + "_" + i);
        }
    }

    public boolean isActive() {
        return this.taskManager.isTaskRunning(new StringBuilder().append(TASK_PREFIX).append(this.name).append("_").append("0").toString()) || this.taskManager.isTaskBlocked(new StringBuilder().append(TASK_PREFIX).append(this.name).append("_").append("0").toString());
    }

    public boolean isTaskLocationKnown() {
        return this.taskManager.isTaskExist(TASK_PREFIX + this.name + "0");
    }

    @Override // org.apache.synapse.message.processor.MessageProcessor
    public boolean isPaused() {
        return this.taskManager.isTaskDeactivated(TASK_PREFIX + this.name + "_0");
    }

    public boolean getActivated() {
        return this.taskManager.isTaskRunning(TASK_PREFIX + this.name + "_0");
    }

    private void setActivated(boolean z) {
        this.parameters.put(MessageProcessorConstants.IS_ACTIVATED, String.valueOf(z));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isThrottling(long j) {
        return j < 1000;
    }

    public boolean isThrottling(String str) {
        return str != null;
    }

    private BlockingMsgSender initMessageSender(Map<String, Object> map) {
        String str = (String) map.get("axis2.repo");
        String str2 = (String) map.get(ForwardingProcessorConstants.AXIS2_CONFIG);
        this.sender = new BlockingMsgSender();
        if (str != null) {
            this.sender.setClientRepository(str);
        }
        if (str2 != null) {
            this.sender.setAxis2xml(str2);
        }
        this.sender.init();
        return this.sender;
    }

    protected abstract Task getTask();

    @Override // org.apache.synapse.task.TaskManagerObserver
    public void update() {
        int i = 0;
        for (String str : this.taskManager.getTaskNames()) {
            if (str.startsWith(TASK_PREFIX) && str.substring(0, str.lastIndexOf("_")).equals(TASK_PREFIX + this.name)) {
                i++;
            }
        }
        if (i > this.memberCount) {
            stopTasks(i);
        }
        start();
    }

    @Override // org.apache.synapse.message.processor.MessageProcessor
    public void cleanupLocalResources() {
        if (this.messageConsumers != null) {
            Iterator<MessageConsumer> it = this.messageConsumers.iterator();
            while (it.hasNext()) {
                it.next().cleanup();
            }
        }
    }

    private Callable<Void> getMessageProcessorCleanupTask() {
        if (logger.isDebugEnabled()) {
            logger.debug("Trying to fetch InboundRequestProcessor from classpath.. ");
        }
        Iterator it = ServiceLoader.load(MessageProcessorCleanupService.class).iterator();
        if (!it.hasNext()) {
            return null;
        }
        MessageProcessorCleanupService messageProcessorCleanupService = (MessageProcessorCleanupService) it.next();
        messageProcessorCleanupService.setName(this.name);
        if (messageProcessorCleanupService != null && logger.isDebugEnabled()) {
            logger.debug("Message Processor Cleanup Service found  : " + messageProcessorCleanupService.getClass().getName());
        }
        return messageProcessorCleanupService;
    }

    private ProcessorState getProcessorState() {
        Properties resourceProperties = this.registry.getResourceProperties(REG_PROCESSOR_BASE_PATH + getName());
        if (resourceProperties == null) {
            return ProcessorState.INITIAL;
        }
        return ProcessorState.RUNNING.toString().equalsIgnoreCase(resourceProperties.getProperty(MP_STATE)) ? ProcessorState.RUNNING : ProcessorState.PAUSED;
    }

    private void setMessageProcessorState(ProcessorState processorState) {
        this.registry.newNonEmptyResource(REG_PROCESSOR_BASE_PATH + getName(), false, "text/plain", processorState.toString(), MP_STATE);
    }

    private void deleteMessageProcessorState() {
        if (this.registry.getResourceProperties(REG_PROCESSOR_BASE_PATH + getName()) != null) {
            this.registry.delete(REG_PROCESSOR_BASE_PATH + getName());
        }
    }

    private void terminate() {
        if (this.task instanceof ForwardingService) {
            ((ForwardingService) this.task).terminate();
        } else if (this.task instanceof SamplingService) {
            ((SamplingService) this.task).terminate();
        } else if (this.task instanceof FailoverForwardingService) {
            ((FailoverForwardingService) this.task).terminate();
        }
    }

    @Override // org.apache.synapse.message.processor.MessageProcessor
    public void cleanUpDeactivatedProcessors() {
        try {
            terminate();
            setMessageProcessorState(ProcessorState.PAUSED);
        } finally {
            cleanupLocalResources();
        }
    }

    @Override // org.apache.synapse.message.processor.MessageProcessor
    public void resumeRemotely() {
        setMessageProcessorState(ProcessorState.RUNNING);
    }
}
