package org.apache.synapse.message.processor.impl.sampler;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.ManagedLifecycle;
import org.apache.synapse.Mediator;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseException;
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.message.MessageConsumer;
import org.apache.synapse.message.processor.MessageProcessor;
import org.apache.synapse.task.Task;
import org.apache.tools.ant.util.FileUtils;
import org.springframework.beans.PropertyAccessor;

/* loaded from: input_file:WEB-INF/lib/synapse-core-2.1.7-wso2v3-BETA.jar:org/apache/synapse/message/processor/impl/sampler/SamplingService.class */
public class SamplingService implements Task, ManagedLifecycle {
    private static Log log = LogFactory.getLog(SamplingService.class);
    private MessageConsumer messageConsumer;
    private MessageProcessor messageProcessor;
    private int concurrency;
    private String sequence;
    private SynapseEnvironment synapseEnvironment;
    private boolean initialized;
    private final String concurrencyPropName;
    private final String sequencePropName;
    private boolean isDeactivatedAtStartup;

    public SamplingService(MessageProcessor messageProcessor, SynapseEnvironment synapseEnvironment, String str, String str2) {
        this.concurrency = 1;
        this.initialized = false;
        this.isDeactivatedAtStartup = false;
        this.messageProcessor = messageProcessor;
        this.synapseEnvironment = synapseEnvironment;
        this.concurrencyPropName = str;
        this.sequencePropName = str2;
    }

    public SamplingService(MessageProcessor messageProcessor, SynapseEnvironment synapseEnvironment, String str, String str2, boolean z) {
        this.concurrency = 1;
        this.initialized = false;
        this.isDeactivatedAtStartup = false;
        this.messageProcessor = messageProcessor;
        this.synapseEnvironment = synapseEnvironment;
        this.concurrencyPropName = str;
        this.sequencePropName = str2;
        this.isDeactivatedAtStartup = z;
    }

    @Override // org.apache.synapse.task.Task
    public void execute() {
        if (this.isDeactivatedAtStartup) {
            try {
                TimeUnit.MILLISECONDS.sleep(FileUtils.FAT_FILE_TIMESTAMP_GRANULARITY);
            } catch (InterruptedException e) {
                log.warn("Initial delay interrupted when Sampling service started as inactive ", e);
            }
            this.isDeactivatedAtStartup = false;
        }
        try {
            if (!this.initialized) {
                init(this.synapseEnvironment);
            }
            if (!this.messageProcessor.isDeactivated()) {
                for (int i = 0; i < this.concurrency; i++) {
                    MessageContext fetch = fetch(this.messageConsumer);
                    if (fetch != null) {
                        dispatch(fetch);
                    } else if (log.isDebugEnabled()) {
                        log.debug("No messages were received for message processor [" + this.messageProcessor.getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
                    }
                }
            } else if (log.isDebugEnabled()) {
                log.debug("Exiting service since the message processor is deactivated");
            }
        } catch (Throwable th) {
            log.fatal("Deactivating the message processor [" + this.messageProcessor.getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX, th);
            this.messageProcessor.stop();
        }
        if (log.isDebugEnabled()) {
            log.debug("Exiting service thread of message processor [" + this.messageProcessor.getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
        }
    }

    @Override // org.apache.synapse.ManagedLifecycle
    public void init(SynapseEnvironment synapseEnvironment) {
        setMessageConsumer();
        Map<String, Object> parameters = this.messageProcessor.getParameters();
        this.sequence = (String) parameters.get(this.sequencePropName);
        String str = (String) parameters.get(this.concurrencyPropName);
        if (str != null) {
            try {
                this.concurrency = Integer.parseInt(str);
            } catch (NumberFormatException e) {
                parameters.remove(this.concurrencyPropName);
                log.error("Invalid value for concurrency switching back to default value", e);
            }
        }
        this.initialized = true;
    }

    public MessageContext fetch(MessageConsumer messageConsumer) {
        MessageContext receive = this.messageConsumer.receive();
        if (receive != null) {
            this.messageConsumer.ack();
        }
        return receive;
    }

    public void dispatch(final MessageContext messageContext) {
        messageContext.getEnvironment().getExecutorService().submit(new Runnable() { // from class: org.apache.synapse.message.processor.impl.sampler.SamplingService.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    Mediator sequence = messageContext.getSequence(SamplingService.this.sequence);
                    if (sequence != null) {
                        sequence.mediate(messageContext);
                    }
                } catch (SynapseException e) {
                    if (!messageContext.getFaultStack().isEmpty()) {
                        messageContext.getFaultStack().pop().handleFault(messageContext, e);
                    }
                    SamplingService.log.error("Error occurred while executing the message", e);
                } catch (Throwable th) {
                    SamplingService.log.error("Error occurred while executing the message", th);
                }
            }
        });
    }

    public boolean terminate() {
        this.messageConsumer.cleanup();
        return true;
    }

    private boolean setMessageConsumer() {
        this.messageConsumer = this.synapseEnvironment.getSynapseConfiguration().getMessageStore(this.messageProcessor.getMessageStoreName()).getConsumer();
        return this.messageProcessor.setMessageConsumer(this.messageConsumer);
    }

    public boolean isInitialized() {
        return this.initialized;
    }

    @Override // org.apache.synapse.ManagedLifecycle
    public void destroy() {
        terminate();
    }
}
