package org.mule.processor;

import java.beans.ExceptionListener;
import java.text.MessageFormat;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkException;
import javax.resource.spi.work.WorkListener;
import org.mule.DefaultMuleEvent;
import org.mule.FailedToQueueEventException;
import org.mule.api.MessagingException;
import org.mule.api.MuleContext;
import org.mule.api.MuleEvent;
import org.mule.api.MuleException;
import org.mule.api.NamedObject;
import org.mule.api.context.WorkManagerSource;
import org.mule.api.lifecycle.InitialisationException;
import org.mule.api.lifecycle.Lifecycle;
import org.mule.api.lifecycle.LifecycleException;
import org.mule.api.lifecycle.LifecycleState;
import org.mule.config.QueueProfile;
import org.mule.config.i18n.CoreMessages;
import org.mule.config.i18n.MessageFactory;
import org.mule.management.stats.QueueStatistics;
import org.mule.service.Pausable;
import org.mule.util.concurrent.WaitableBoolean;
import org.mule.util.queue.Queue;
import org.mule.util.queue.QueueSession;
import org.mule.work.AbstractMuleEventWork;

/* loaded from: input_file:WEB-INF/lib/mule-core-3.0.0-M4.jar:org/mule/processor/SedaStageInterceptingMessageProcessor.class */
public class SedaStageInterceptingMessageProcessor extends AsyncInterceptingMessageProcessor implements WorkListener, Work, Lifecycle {
    protected static final String QUEUE_NAME_PREFIX = "seda.queue";
    protected QueueProfile queueProfile;
    protected int queueTimeout;
    protected LifecycleState lifecycleState;
    protected QueueStatistics queueStatistics;
    protected MuleContext muleContext;
    protected String name;
    protected Queue queue;
    private WaitableBoolean queueDraining;

    /* loaded from: input_file:WEB-INF/lib/mule-core-3.0.0-M4.jar:org/mule/processor/SedaStageInterceptingMessageProcessor$SedaStageWorker.class */
    private class SedaStageWorker extends AbstractMuleEventWork {
        public SedaStageWorker(MuleEvent muleEvent) {
            super(muleEvent);
        }

        @Override // org.mule.work.AbstractMuleEventWork
        protected void doRun() {
            try {
                SedaStageInterceptingMessageProcessor.this.processNext(this.event);
            } catch (Exception e) {
                this.event.getSession().setValid(false);
                if (e instanceof MessagingException) {
                    SedaStageInterceptingMessageProcessor.this.exceptionListener.exceptionThrown(e);
                } else {
                    SedaStageInterceptingMessageProcessor.this.exceptionListener.exceptionThrown(new MessagingException(CoreMessages.eventProcessingFailedFor(SedaStageInterceptingMessageProcessor.this.getStageDescription()), this.event.getMessage(), e));
                }
            }
        }
    }

    public SedaStageInterceptingMessageProcessor(String str, QueueProfile queueProfile, int i, WorkManagerSource workManagerSource, boolean z, LifecycleState lifecycleState, ExceptionListener exceptionListener, QueueStatistics queueStatistics, MuleContext muleContext) {
        super(workManagerSource, z, exceptionListener);
        this.queueDraining = new WaitableBoolean(false);
        this.name = str;
        this.queueProfile = queueProfile;
        this.queueTimeout = i;
        this.lifecycleState = lifecycleState;
        this.queueStatistics = queueStatistics;
        this.muleContext = muleContext;
    }

    @Override // org.mule.processor.AsyncInterceptingMessageProcessor, org.mule.api.processor.MessageProcessor
    public MuleEvent process(MuleEvent muleEvent) throws MuleException {
        if (this.next == null) {
            return muleEvent;
        }
        if (muleEvent.isSynchronous() || muleEvent.getEndpoint().getTransactionConfig().isTransacted()) {
            return processNext(muleEvent);
        }
        processAsync(muleEvent);
        return null;
    }

    @Override // org.mule.processor.AsyncInterceptingMessageProcessor
    protected void processAsync(MuleEvent muleEvent) throws MuleException {
        try {
            if (isStatsEnabled()) {
                this.queueStatistics.incQueuedEvent();
            }
            enqueue(muleEvent);
        } catch (Exception e) {
            this.exceptionListener.exceptionThrown(new FailedToQueueEventException(CoreMessages.interruptedQueuingEventFor(getStageDescription()), muleEvent.getMessage(), e));
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("MuleEvent added to queue for: " + getStageDescription());
        }
    }

    protected boolean isStatsEnabled() {
        return this.queueStatistics != null && this.queueStatistics.isEnabled();
    }

    protected void enqueue(MuleEvent muleEvent) throws Exception {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug(MessageFormat.format("{1}: Putting event on queue {2}", this.queue.getName(), getStageDescription(), muleEvent));
        }
        this.queue.put(muleEvent);
    }

    protected MuleEvent dequeue() throws Exception {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace(MessageFormat.format("{0}: Polling queue {1}, timeout = {2}", getStageName(), getStageDescription(), Integer.valueOf(this.queueTimeout)));
        }
        MuleEvent muleEvent = (MuleEvent) this.queue.poll(this.queueTimeout);
        if (muleEvent == null || !this.lifecycleState.isPhaseComplete(Pausable.PHASE_NAME)) {
            return muleEvent;
        }
        this.queue.untake(muleEvent);
        return null;
    }

    @Override // java.lang.Runnable
    public void run() {
        DefaultMuleEvent defaultMuleEvent = null;
        QueueSession queueSession = this.muleContext.getQueueManager().getQueueSession();
        while (!this.lifecycleState.isStopped()) {
            try {
                if (this.lifecycleState.isPhaseComplete(Pausable.PHASE_NAME)) {
                    waitIfPaused();
                    if (this.lifecycleState.isStopping()) {
                        this.queueDraining.set(true);
                        if (!isQueuePersistent() && queueSession != null && getQueueSize() > 0) {
                            this.logger.warn(CoreMessages.stopPausedSedaStageNonPeristentQueueMessageLoss(getQueueSize(), getQueueName()));
                        }
                        this.queueDraining.set(false);
                        return;
                    }
                }
            } catch (Exception e) {
                if (e instanceof InterruptedException) {
                    this.queueDraining.set(false);
                    return;
                } else if (e instanceof MuleException) {
                    this.exceptionListener.exceptionThrown(e);
                } else {
                    this.exceptionListener.exceptionThrown(new MessagingException(CoreMessages.eventProcessingFailedFor(getStageDescription()), defaultMuleEvent == null ? null : defaultMuleEvent.getMessage(), e));
                }
            }
            if (this.lifecycleState.isStopping() && (isQueuePersistent() || queueSession == null || getQueueSize() <= 0)) {
                this.queueDraining.set(false);
                return;
            }
            defaultMuleEvent = (DefaultMuleEvent) dequeue();
            if (defaultMuleEvent != null) {
                if (isStatsEnabled()) {
                    this.queueStatistics.decQueuedEvent();
                }
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug(MessageFormat.format("{0}: Dequeued event from {1}", getStageDescription(), getQueueName()));
                }
                SedaStageWorker sedaStageWorker = new SedaStageWorker(defaultMuleEvent);
                if (this.doThreading) {
                    this.workManagerSource.getWorkManager().scheduleWork(sedaStageWorker, Long.MAX_VALUE, null, this);
                } else {
                    sedaStageWorker.run();
                }
            }
        }
    }

    protected boolean isQueuePersistent() {
        return this.queueProfile.isPersistent();
    }

    public int getQueueSize() {
        return this.queue.size();
    }

    protected String getQueueName() {
        return String.format("%s(%s)", QUEUE_NAME_PREFIX, getStageName());
    }

    protected String getStageName() {
        return this.name != null ? this.name : this.next instanceof NamedObject ? ((NamedObject) this.next).getName() : String.format("%s.%s", this.next.getClass().getName(), Integer.valueOf(this.next.hashCode()));
    }

    protected String getStageDescription() {
        return "SEDA Stage " + getStageName();
    }

    protected void waitIfPaused() throws InterruptedException {
        if (this.logger.isDebugEnabled() && this.lifecycleState.isPhaseComplete(Pausable.PHASE_NAME)) {
            this.logger.debug(getStageDescription() + " is paused. Polling halted until resumed is called");
        }
        while (this.lifecycleState.isPhaseComplete(Pausable.PHASE_NAME) && !this.lifecycleState.isStopping()) {
            Thread.sleep(500L);
        }
    }

    @Override // javax.resource.spi.work.Work
    public void release() {
        this.queueDraining.set(false);
    }

    @Override // org.mule.api.lifecycle.Initialisable
    public void initialise() throws InitialisationException {
        if (this.next == null) {
            throw new IllegalStateException("Next message processor cannot be null with this InterceptingMessageProcessor");
        }
        this.queueProfile.configureQueue(getQueueName(), this.muleContext.getQueueManager());
        this.queue = this.muleContext.getQueueManager().getQueueSession().getQueue(getQueueName());
        if (this.queue == null) {
            throw new InitialisationException(MessageFactory.createStaticMessage("Queue not created for " + getStageDescription()), this);
        }
    }

    @Override // org.mule.api.lifecycle.Startable
    public void start() throws MuleException {
        if (this.queue == null) {
            throw new IllegalStateException("Not initialised");
        }
        try {
            this.workManagerSource.getWorkManager().scheduleWork(this, Long.MAX_VALUE, null, this);
        } catch (WorkException e) {
            throw new LifecycleException(CoreMessages.failedToStart(getStageDescription()), e, this);
        }
    }

    @Override // org.mule.api.lifecycle.Stoppable
    public void stop() throws MuleException {
        if (this.queue == null || this.queue.size() <= 0) {
            return;
        }
        try {
            this.queueDraining.whenFalse(null);
        } catch (InterruptedException e) {
        }
    }

    @Override // org.mule.api.lifecycle.Disposable
    public void dispose() {
        this.queue = null;
    }
}
