package org.apache.synapse.message.processor.impl.forwarder;

import java.util.Date;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.axiom.om.OMElement;
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axis2.description.Parameter;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.axis2.transport.http.HTTPConstants;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.synapse.ManagedLifecycle;
import org.apache.synapse.Mediator;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
import org.apache.synapse.commons.json.JsonUtil;
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.apache.synapse.endpoints.Endpoint;
import org.apache.synapse.message.MessageConsumer;
import org.apache.synapse.message.processor.MessageProcessor;
import org.apache.synapse.message.processor.MessageProcessorConstants;
import org.apache.synapse.message.senders.blocking.BlockingMsgSender;
import org.apache.synapse.task.Task;
import org.apache.synapse.util.MessageHelper;
import org.springframework.beans.PropertyAccessor;

/* loaded from: input_file:WEB-INF/lib/synapse-core-2.1.3-wso2v2.jar:org/apache/synapse/message/processor/impl/forwarder/ForwardingService.class */
public class ForwardingService implements Task, ManagedLifecycle {
    private static final Log log = LogFactory.getLog(ForwardingService.class);
    private MessageConsumer messageConsumer;
    private MessageProcessor messageProcessor;
    private BlockingMsgSender sender;
    private long interval;
    private SynapseEnvironment synapseEnvironment;
    private int retryInterval = 1000;
    private String faultSeq = null;
    private String replySeq = null;
    private String targetEndpoint = null;
    private String cronExpression = null;
    private String[] nonRetryStatusCodes = null;
    private boolean isSuccessful = false;
    private volatile boolean isTerminated = false;
    private int maxDeliverAttempts = -1;
    private int attemptCount = 0;
    private boolean isThrottling = true;
    private long throttlingInterval = -1;
    private boolean isMaxDeliveryAttemptDropEnabled = false;
    private boolean initialized = false;

    public ForwardingService(MessageProcessor messageProcessor, BlockingMsgSender blockingMsgSender, SynapseEnvironment synapseEnvironment, long j) {
        this.messageProcessor = messageProcessor;
        this.sender = blockingMsgSender;
        this.synapseEnvironment = synapseEnvironment;
        this.interval = j;
    }

    @Override // org.apache.synapse.task.Task
    public void execute() {
        long time = new Date().getTime();
        if (!this.initialized) {
            init(this.synapseEnvironment);
        }
        do {
            resetService();
            try {
                if (!this.messageProcessor.isDeactivated()) {
                    MessageContext fetch = fetch(this.messageConsumer);
                    if (fetch == null) {
                        if (log.isDebugEnabled()) {
                            log.debug("No messages were received for message processor [" + this.messageProcessor.getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
                        }
                        if (isRunningUnderCronExpression()) {
                            break;
                        }
                    } else {
                        String str = (String) fetch.getProperty(SynapseConstants.Axis2Param.SYNAPSE_SERVER_NAME);
                        if (str != null && (fetch instanceof Axis2MessageContext) && !str.equals(getAxis2ParameterValue(((Axis2MessageContext) fetch).getAxis2MessageContext().getConfigurationContext().getAxisConfiguration(), SynapseConstants.Axis2Param.SYNAPSE_SERVER_NAME))) {
                            return;
                        }
                        Set propertyKeySet = fetch.getPropertyKeySet();
                        if (propertyKeySet != null && propertyKeySet.contains("blocking.sender.error")) {
                            propertyKeySet.remove("blocking.sender.error");
                        }
                        this.isTerminated = this.messageProcessor.isDeactivated();
                        dispatch(fetch);
                    }
                } else {
                    this.isTerminated = true;
                    if (log.isDebugEnabled()) {
                        log.debug("Exiting service since the message processor is deactivated");
                    }
                }
            } catch (Throwable th) {
                log.fatal("Deactivating the message processor [" + this.messageProcessor.getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX, th);
                this.messageProcessor.deactivate();
            }
            if (log.isDebugEnabled()) {
                log.debug("Exiting the iteration of message processor [" + this.messageProcessor.getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
            }
            if (isRunningUnderCronExpression()) {
                try {
                    Thread.sleep(this.throttlingInterval);
                } catch (InterruptedException e) {
                    log.debug("Current Thread was interrupted while it is sleeping.");
                }
            }
            if (this.interval > 0 && this.interval < 1000) {
                try {
                    Thread.sleep(this.interval);
                } catch (InterruptedException e2) {
                    log.debug("Current Thread was interrupted while it is sleeping.");
                }
            }
            if ((this.isThrottling && new Date().getTime() - time > 1000) || (!this.isThrottling && !isRunningUnderCronExpression())) {
                break;
            }
        } while (!this.isTerminated);
        if (log.isDebugEnabled()) {
            log.debug("Exiting service thread of message processor [" + this.messageProcessor.getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
        }
    }

    private static String getAxis2ParameterValue(AxisConfiguration axisConfiguration, String str) {
        Object value;
        Parameter parameter = axisConfiguration.getParameter(str);
        if (parameter == null || (value = parameter.getValue()) == null || !(value instanceof String)) {
            return null;
        }
        return (String) parameter.getValue();
    }

    @Override // org.apache.synapse.ManagedLifecycle
    public void init(SynapseEnvironment synapseEnvironment) {
        setMessageConsumer();
        Map<String, Object> parameters = this.messageProcessor.getParameters();
        if (parameters.get(MessageProcessorConstants.MAX_DELIVER_ATTEMPTS) != null) {
            this.maxDeliverAttempts = Integer.parseInt((String) parameters.get(MessageProcessorConstants.MAX_DELIVER_ATTEMPTS));
        }
        if (parameters.get(MessageProcessorConstants.RETRY_INTERVAL) != null) {
            this.retryInterval = Integer.parseInt((String) parameters.get(MessageProcessorConstants.RETRY_INTERVAL));
        }
        this.replySeq = (String) parameters.get(ForwardingProcessorConstants.REPLY_SEQUENCE);
        this.faultSeq = (String) parameters.get(ForwardingProcessorConstants.FAULT_SEQUENCE);
        this.targetEndpoint = (String) parameters.get(ForwardingProcessorConstants.TARGET_ENDPOINT);
        if (parameters.get("throttle") != null) {
            this.isThrottling = Boolean.parseBoolean((String) parameters.get("throttle"));
        }
        if (parameters.get(ForwardingProcessorConstants.CRON_EXPRESSION) != null) {
            this.cronExpression = String.valueOf(parameters.get(ForwardingProcessorConstants.CRON_EXPRESSION));
        }
        if (this.cronExpression != null && parameters.get(ForwardingProcessorConstants.THROTTLE_INTERVAL) != null) {
            this.throttlingInterval = Long.parseLong((String) parameters.get(ForwardingProcessorConstants.THROTTLE_INTERVAL));
        }
        this.nonRetryStatusCodes = (String[]) parameters.get(ForwardingProcessorConstants.NON_RETRY_STATUS_CODES);
        if (parameters.get(ForwardingProcessorConstants.MAX_DELIVERY_DROP) != null && parameters.get(ForwardingProcessorConstants.MAX_DELIVERY_DROP).toString().equals(Sandesha2Constants.Assertions.ELEM_ENABLED) && this.maxDeliverAttempts > 0) {
            this.isMaxDeliveryAttemptDropEnabled = true;
        }
        this.interval = Long.parseLong((String) parameters.get(MessageProcessorConstants.INTERVAL));
        this.initialized = true;
    }

    private Set<Integer> getNonRetryStatusCodes() {
        HashSet hashSet = new HashSet();
        if (this.nonRetryStatusCodes != null) {
            for (String str : this.nonRetryStatusCodes) {
                try {
                    hashSet.add(Integer.valueOf(Integer.parseInt(str)));
                } catch (NumberFormatException e) {
                }
            }
        }
        return hashSet;
    }

    public MessageContext fetch(MessageConsumer messageConsumer) {
        return this.messageConsumer.receive();
    }

    public void dispatch(MessageContext messageContext) {
        OMElement firstElement;
        if (log.isDebugEnabled()) {
            log.debug("Sending the message to client with message processor [" + this.messageProcessor.getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
        }
        if (this.targetEndpoint == null) {
            this.targetEndpoint = (String) messageContext.getProperty(ForwardingProcessorConstants.TARGET_ENDPOINT);
        }
        MessageContext messageContext2 = null;
        SOAPEnvelope envelope = messageContext.getEnvelope();
        if (this.targetEndpoint == null) {
            log.warn("Property target.endpoint not found in the message context , Hence removing the message ");
            this.messageConsumer.ack();
            return;
        }
        Endpoint endpoint = messageContext.getEndpoint(this.targetEndpoint);
        while (!this.isSuccessful && !this.isTerminated) {
            try {
                try {
                    messageContext.setEnvelope(MessageHelper.cloneSOAPEnvelope(envelope));
                    OMElement oMElement = null;
                    org.apache.axis2.context.MessageContext axis2MessageContext = ((Axis2MessageContext) messageContext).getAxis2MessageContext();
                    if (JsonUtil.hasAJsonPayload(axis2MessageContext)) {
                        oMElement = axis2MessageContext.getEnvelope().getBody().getFirstElement();
                    }
                    if (JsonUtil.hasAJsonPayload(oMElement) && (firstElement = messageContext.getEnvelope().getBody().getFirstElement()) != null) {
                        firstElement.detach();
                        messageContext.getEnvelope().getBody().addChild(oMElement);
                    }
                    axis2MessageContext.setProperty(HTTPConstants.NON_ERROR_HTTP_STATUS_CODES, getNonRetryStatusCodes());
                    messageContext2 = this.sender.send(endpoint, messageContext);
                    this.isSuccessful = true;
                } catch (Exception e) {
                    if (e instanceof SynapseException) {
                        this.isSuccessful = isNonRetryErrorCode(e.getCause().getMessage());
                    }
                    if (!this.isSuccessful) {
                        log.error("BlockingMessageSender of message processor [" + this.messageProcessor.getName() + "] failed to send message to the endpoint");
                        sendThroughFaultSeq(messageContext);
                    }
                }
                if (this.isSuccessful) {
                    if (messageContext2 == null) {
                        this.messageConsumer.ack();
                        this.attemptCount = 0;
                        this.isSuccessful = true;
                        if (log.isDebugEnabled()) {
                            log.debug("Successfully sent the message to endpoint [" + endpoint.getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX + " with message processor [" + this.messageProcessor.getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
                        }
                    } else if ("true".equals(messageContext2.getProperty("blocking.sender.error"))) {
                        this.isSuccessful = isNonRetryErrorCode((String) messageContext2.getProperty("ERROR_MESSAGE"));
                        if (this.isSuccessful) {
                            sendThroughReplySeq(messageContext2);
                        } else {
                            log.error("BlockingMessageSender of message processor [" + this.messageProcessor.getName() + "] failed to send message to the endpoint");
                            sendThroughFaultSeq(messageContext2);
                        }
                    } else {
                        sendThroughReplySeq(messageContext2);
                        this.messageConsumer.ack();
                        this.attemptCount = 0;
                        this.isSuccessful = true;
                        if (log.isDebugEnabled()) {
                            log.debug("Successfully sent the message to endpoint [" + endpoint.getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX + " with message processor [" + this.messageProcessor.getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
                        }
                    }
                }
                if (!this.isSuccessful) {
                    prepareToRetry();
                } else if (this.messageProcessor.isPaused()) {
                    this.messageProcessor.resumeService();
                    log.info("Resuming the service of message processor [" + this.messageProcessor.getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
                }
            } catch (Exception e2) {
                log.error("Message processor [" + this.messageProcessor.getName() + "] failed to send the message to client", e2);
                return;
            }
        }
    }

    public void sendThroughFaultSeq(MessageContext messageContext) {
        if (this.faultSeq == null) {
            log.warn("Failed to send the message through the fault sequence. Sequence name does not Exist.");
            return;
        }
        Mediator sequence = messageContext.getSequence(this.faultSeq);
        if (sequence == null) {
            log.warn("Failed to send the message through the fault sequence. Sequence [" + this.faultSeq + "] does not Exist.");
        } else {
            sequence.mediate(messageContext);
        }
    }

    public void sendThroughReplySeq(MessageContext messageContext) {
        if (this.replySeq == null) {
            this.messageProcessor.deactivate();
            log.error("Failed to send the out message. Reply sequence does not Exist. Deactivated the message processor");
            return;
        }
        Mediator sequence = messageContext.getSequence(this.replySeq);
        if (sequence != null) {
            sequence.mediate(messageContext);
        } else {
            this.messageProcessor.deactivate();
            log.error("Failed to send the out message. Reply sequence [" + this.replySeq + "] does not exist. Deactivated the message processor");
        }
    }

    public boolean terminate() {
        try {
            this.isTerminated = true;
            if (!log.isDebugEnabled()) {
                return true;
            }
            log.debug("Successfully terminated job of message processor [" + this.messageProcessor.getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
            return true;
        } catch (Exception e) {
            log.error("Failed to terminate the job of message processor [" + this.messageProcessor.getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
            return false;
        }
    }

    private void checkAndDeactivateProcessor() {
        if (this.maxDeliverAttempts > 0) {
            this.attemptCount++;
            if (this.attemptCount >= this.maxDeliverAttempts) {
                if (this.isMaxDeliveryAttemptDropEnabled) {
                    dropMessageAndContinueMessageProcessor();
                    if (log.isDebugEnabled()) {
                        log.debug("Message processor [" + this.messageProcessor.getName() + "] Dropped the failed message and continue due to reach of max attempts");
                        return;
                    }
                    return;
                }
                terminate();
                this.messageProcessor.deactivate();
                if (log.isDebugEnabled()) {
                    log.debug("Message processor [" + this.messageProcessor.getName() + "] stopped due to reach of max attempts");
                }
            }
        }
    }

    private void prepareToRetry() {
        if (this.isTerminated) {
            return;
        }
        if (!this.messageProcessor.isPaused()) {
            this.messageProcessor.pauseService();
            log.info("Pausing the service of message processor [" + this.messageProcessor.getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
        }
        checkAndDeactivateProcessor();
        if (log.isDebugEnabled()) {
            log.debug("Failed to send to client retrying after " + this.retryInterval + "s with attempt count - " + this.attemptCount);
        }
        try {
            Thread.sleep(this.retryInterval);
        } catch (InterruptedException e) {
        }
    }

    private void resetService() {
        this.isSuccessful = false;
        this.attemptCount = 0;
    }

    private boolean isNonRetryErrorCode(String str) {
        boolean z = false;
        if (this.nonRetryStatusCodes != null) {
            String[] strArr = this.nonRetryStatusCodes;
            int length = strArr.length;
            int i = 0;
            while (true) {
                if (i >= length) {
                    break;
                }
                String str2 = strArr[i];
                if (str != null && str.contains(str2)) {
                    z = true;
                    break;
                }
                i++;
            }
        }
        return z;
    }

    private boolean isRunningUnderCronExpression() {
        return this.cronExpression != null && this.throttlingInterval > -1;
    }

    private void dropMessageAndContinueMessageProcessor() {
        this.messageConsumer.ack();
        this.attemptCount = 0;
        this.isSuccessful = true;
        if (this.messageProcessor.isPaused()) {
            this.messageProcessor.resumeService();
        }
        log.info("Removed failed message and continue the message processor [" + this.messageProcessor.getName() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
    }

    private boolean setMessageConsumer() {
        this.messageConsumer = this.synapseEnvironment.getSynapseConfiguration().getMessageStore(this.messageProcessor.getMessageStoreName()).getConsumer();
        return this.messageProcessor.setMessageConsumer(this.messageConsumer);
    }

    public boolean isInitialized() {
        return this.initialized;
    }

    @Override // org.apache.synapse.ManagedLifecycle
    public void destroy() {
        terminate();
    }
}
