package org.apache.synapse.message.processor.impl.failover;

import java.util.Date;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.axiom.om.OMElement;
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.synapse.ManagedLifecycle;
import org.apache.synapse.Mediator;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseException;
import org.apache.synapse.commons.json.JsonUtil;
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.apache.synapse.message.MessageConsumer;
import org.apache.synapse.message.MessageProducer;
import org.apache.synapse.message.StoreForwardException;
import org.apache.synapse.message.processor.MessageProcessor;
import org.apache.synapse.message.processor.MessageProcessorConstants;
import org.apache.synapse.task.Task;
import org.apache.synapse.util.MessageHelper;
import org.apache.tools.ant.util.FileUtils;
import org.springframework.beans.PropertyAccessor;

/* loaded from: input_file:WEB-INF/lib/synapse-core-2.1.7-wso2v167.jar:org/apache/synapse/message/processor/impl/failover/FailoverForwardingService.class */
public class FailoverForwardingService implements Task, ManagedLifecycle {
    private static final Log log = LogFactory.getLog(FailoverForwardingService.class);
    private MessageConsumer messageConsumer;
    private MessageProducer targetMessageProducer;
    private String targetMessageStoreName;
    private MessageProcessor messageProcessor;
    private long interval;
    private SynapseEnvironment synapseEnvironment;
    private boolean isDeactivatedAtStartup;
    private int retryInterval = 1000;
    private String faultSeq = null;
    private String deactivateSeq = null;
    private String cronExpression = null;
    private boolean isSuccessful = false;
    private volatile boolean isTerminated = false;
    private int maxDeliverAttempts = -1;
    private int attemptCount = 0;
    private boolean isThrottling = true;
    private long throttlingInterval = -1;
    private boolean isMaxDeliveryAttemptDropEnabled = false;
    private boolean initialized = false;

    public FailoverForwardingService(MessageProcessor messageProcessor, SynapseEnvironment synapseEnvironment, long j, boolean z) {
        this.isDeactivatedAtStartup = false;
        this.messageProcessor = messageProcessor;
        this.synapseEnvironment = synapseEnvironment;
        this.interval = j;
        this.isDeactivatedAtStartup = z;
    }

    @Override // org.apache.synapse.task.Task
    public void execute() {
        long time = new Date().getTime();
        if (this.isDeactivatedAtStartup) {
            try {
                TimeUnit.MILLISECONDS.sleep(FileUtils.FAT_FILE_TIMESTAMP_GRANULARITY);
            } catch (InterruptedException e) {
                log.warn("Initial delay interrupted when Failover Forwarding service started as inactive ", e);
            }
            this.isDeactivatedAtStartup = false;
        }
        try {
            if (!this.initialized) {
                init(this.synapseEnvironment);
            }
            do {
                resetService();
                try {
                    if (!this.messageProcessor.isDeactivated()) {
                        MessageContext fetch = fetch(this.messageConsumer);
                        if (fetch == null) {
                            if (log.isDebugEnabled()) {
                                log.debug("No messages were received for message processor [" + this.messageProcessor.getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
                            }
                            if (isRunningUnderCronExpression()) {
                                break;
                            }
                        } else {
                            this.isTerminated = this.messageProcessor.isDeactivated();
                            dispatch(fetch);
                        }
                    } else {
                        this.isTerminated = true;
                        if (log.isDebugEnabled()) {
                            log.debug("Exiting service since the message processor is deactivated");
                        }
                    }
                } catch (Throwable th) {
                    log.fatal("Deactivating the message processor [" + this.messageProcessor.getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX, th);
                    deactivateMessageProcessor(null);
                }
                if (log.isDebugEnabled()) {
                    log.debug("Exiting the iteration of message processor [" + this.messageProcessor.getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
                }
                if (isRunningUnderCronExpression()) {
                    try {
                        Thread.sleep(this.throttlingInterval);
                    } catch (InterruptedException e2) {
                        log.debug("Current Thread was interrupted while it is sleeping.");
                    }
                }
                if (this.interval > 0 && this.interval < 1000) {
                    try {
                        Thread.sleep(this.interval);
                    } catch (InterruptedException e3) {
                        log.debug("Current Thread was interrupted while it is sleeping.");
                    }
                }
                if ((this.isThrottling && new Date().getTime() - time > 1000) || (!this.isThrottling && !isRunningUnderCronExpression())) {
                    break;
                }
            } while (!this.isTerminated);
            if (log.isDebugEnabled()) {
                log.debug("Exiting service thread of message processor [" + this.messageProcessor.getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
            }
        } catch (SynapseException e4) {
            throw new SynapseException("Error while initializing forwarding service " + this.targetMessageStoreName, e4);
        }
    }

    @Override // org.apache.synapse.ManagedLifecycle
    public void init(SynapseEnvironment synapseEnvironment) throws SynapseException {
        try {
            setMessageConsumerAndProducer();
            Map<String, Object> parameters = this.messageProcessor.getParameters();
            if (parameters.get(MessageProcessorConstants.MAX_DELIVER_ATTEMPTS) != null) {
                this.maxDeliverAttempts = Integer.parseInt((String) parameters.get(MessageProcessorConstants.MAX_DELIVER_ATTEMPTS));
            }
            if (parameters.get(MessageProcessorConstants.RETRY_INTERVAL) != null) {
                this.retryInterval = Integer.parseInt((String) parameters.get(MessageProcessorConstants.RETRY_INTERVAL));
            }
            this.faultSeq = (String) parameters.get("message.processor.fault.sequence");
            this.deactivateSeq = (String) parameters.get("message.processor.deactivate.sequence");
            if (parameters.get("throttle") != null) {
                this.isThrottling = Boolean.parseBoolean((String) parameters.get("throttle"));
            }
            if (parameters.get("cron.expression") != null) {
                this.cronExpression = String.valueOf(parameters.get("cron.expression"));
            }
            if (this.cronExpression != null && parameters.get("throttle.interval") != null) {
                this.throttlingInterval = Long.parseLong((String) parameters.get("throttle.interval"));
            }
            if (parameters.get("max.delivery.drop") != null && parameters.get("max.delivery.drop").toString().equals(Sandesha2Constants.Assertions.ELEM_ENABLED) && this.maxDeliverAttempts > 0) {
                this.isMaxDeliveryAttemptDropEnabled = true;
            }
            this.interval = Long.parseLong((String) parameters.get(MessageProcessorConstants.INTERVAL));
            this.initialized = true;
        } catch (StoreForwardException e) {
            throw new SynapseException("Error while initializing Message Consumer " + this.messageProcessor.getName() + "and Message Producer " + this.targetMessageStoreName, e);
        }
    }

    public MessageContext fetch(MessageConsumer messageConsumer) throws StoreForwardException {
        return this.messageConsumer.receive();
    }

    public void dispatch(MessageContext messageContext) {
        OMElement firstElement;
        if (log.isDebugEnabled()) {
            log.debug("Sending the message to client with message processor [" + this.messageProcessor.getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
        }
        SOAPEnvelope envelope = messageContext.getEnvelope();
        if (this.targetMessageStoreName == null) {
            log.warn("Property message.target.store.name not found in the message context , Hence removing the message ");
            this.messageConsumer.ack();
            return;
        }
        while (!this.isSuccessful && !this.isTerminated) {
            try {
                try {
                    messageContext.setEnvelope(MessageHelper.cloneSOAPEnvelope(envelope));
                    OMElement oMElement = null;
                    org.apache.axis2.context.MessageContext axis2MessageContext = ((Axis2MessageContext) messageContext).getAxis2MessageContext();
                    if (JsonUtil.hasAJsonPayload(axis2MessageContext)) {
                        oMElement = axis2MessageContext.getEnvelope().getBody().getFirstElement();
                    }
                    if (JsonUtil.hasAJsonPayload(oMElement) && (firstElement = messageContext.getEnvelope().getBody().getFirstElement()) != null) {
                        firstElement.detach();
                        messageContext.getEnvelope().getBody().addChild(oMElement);
                    }
                    if (this.messageConsumer != null && this.messageConsumer.isAlive() && this.targetMessageStoreName != null) {
                        this.targetMessageProducer = this.synapseEnvironment.getSynapseConfiguration().getMessageStore(this.targetMessageStoreName).getProducer();
                        if (this.targetMessageProducer != null) {
                            this.isSuccessful = this.targetMessageProducer.storeMessage(messageContext);
                        } else {
                            this.isSuccessful = false;
                        }
                    }
                } catch (Exception e) {
                    log.error("Message store messageSender of message processor [" + this.messageProcessor.getName() + "] failed to send message to the target message store");
                    sendThroughFaultSeq(messageContext);
                }
                if (this.isSuccessful) {
                    this.messageConsumer.ack();
                    this.attemptCount = 0;
                    if (log.isDebugEnabled()) {
                        log.debug("Successfully sent the message to message store [" + this.targetMessageStoreName + "] with message processor [" + this.messageProcessor.getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
                    }
                    if (this.messageProcessor.isPaused()) {
                        this.messageProcessor.resumeService();
                        log.info("Resuming the service of message processor [" + this.messageProcessor.getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
                    }
                } else {
                    prepareToRetry(messageContext);
                }
            } catch (Exception e2) {
                log.error("Message processor [" + this.messageProcessor.getName() + "] failed to send the message to target store", e2);
                return;
            }
        }
    }

    public void sendThroughFaultSeq(MessageContext messageContext) {
        if (this.faultSeq == null) {
            log.warn("Failed to send the message through the fault sequence. Sequence name does not Exist.");
            return;
        }
        Mediator sequence = messageContext.getSequence(this.faultSeq);
        if (sequence == null) {
            log.warn("Failed to send the message through the fault sequence. Sequence [" + this.faultSeq + "] does not Exist.");
        } else {
            sequence.mediate(messageContext);
        }
    }

    public void sendThroughDeactivateSeq(MessageContext messageContext) {
        if (this.deactivateSeq == null) {
            log.warn("Failed to send the message through the deactivate sequence. Sequence name does not Exist.");
            return;
        }
        Mediator sequence = messageContext.getSequence(this.deactivateSeq);
        if (sequence == null) {
            log.warn("Failed to send the message through the deactivate sequence. Sequence [" + this.deactivateSeq + "] does not Exist.");
        } else {
            sequence.mediate(messageContext);
        }
    }

    public boolean terminate() {
        try {
            this.isTerminated = true;
            if (!log.isDebugEnabled()) {
                return true;
            }
            log.debug("Successfully terminated job of message processor [" + this.messageProcessor.getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
            return true;
        } catch (Exception e) {
            log.error("Failed to terminate the job of message processor [" + this.messageProcessor.getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
            return false;
        }
    }

    private void checkAndDeactivateProcessor(MessageContext messageContext) {
        if (this.maxDeliverAttempts > 0) {
            this.attemptCount++;
            if (this.attemptCount >= this.maxDeliverAttempts) {
                if (this.isMaxDeliveryAttemptDropEnabled) {
                    dropMessageAndContinueMessageProcessor();
                    if (log.isDebugEnabled()) {
                        log.debug("Message processor [" + this.messageProcessor.getName() + "] Dropped the failed message and continue due to reach of max attempts");
                        return;
                    }
                    return;
                }
                terminate();
                deactivateMessageProcessor(messageContext);
                if (log.isDebugEnabled()) {
                    log.debug("Message processor [" + this.messageProcessor.getName() + "] stopped due to reach of max attempts");
                }
            }
        }
    }

    private void prepareToRetry(MessageContext messageContext) {
        if (this.isTerminated) {
            return;
        }
        checkAndDeactivateProcessor(messageContext);
        if (log.isDebugEnabled()) {
            log.debug("Failed to send to target store retrying after " + this.retryInterval + "s with attempt count - " + this.attemptCount);
        }
        try {
            Thread.sleep(this.retryInterval);
        } catch (InterruptedException e) {
        }
    }

    private void deactivateMessageProcessor(MessageContext messageContext) {
        sendThroughDeactivateSeq(messageContext);
        this.messageProcessor.deactivate();
    }

    private void resetService() {
        this.isSuccessful = false;
        this.attemptCount = 0;
    }

    private boolean isRunningUnderCronExpression() {
        return this.cronExpression != null && this.throttlingInterval > -1;
    }

    private void dropMessageAndContinueMessageProcessor() {
        this.messageConsumer.ack();
        this.attemptCount = 0;
        this.isSuccessful = true;
        if (this.messageProcessor.isPaused()) {
            this.messageProcessor.resumeService();
        }
        log.info("Removed failed message and continue the message processor [" + this.messageProcessor.getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
    }

    private boolean setMessageConsumerAndProducer() throws StoreForwardException {
        this.messageConsumer = this.synapseEnvironment.getSynapseConfiguration().getMessageStore(this.messageProcessor.getMessageStoreName()).getConsumer();
        if (this.messageProcessor.getParameters().get(FailoverForwardingProcessorConstants.TARGET_MESSAGE_STORE) != null) {
            this.targetMessageStoreName = (String) this.messageProcessor.getParameters().get(FailoverForwardingProcessorConstants.TARGET_MESSAGE_STORE);
        }
        return this.messageProcessor.setMessageConsumer(this.messageConsumer);
    }

    public boolean isInitialized() {
        return this.initialized;
    }

    @Override // org.apache.synapse.ManagedLifecycle
    public void destroy() {
        terminate();
    }
}
