CloneMediator.java

/*
 *  Licensed to the Apache Software Foundation (ASF) under one
 *  or more contributor license agreements.  See the NOTICE file
 *  distributed with this work for additional information
 *  regarding copyright ownership.  The ASF licenses this file
 *  to you under the Apache License, Version 2.0 (the
 *  "License"); you may not use this file except in compliance
 *  with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing,
 *  software distributed under the License is distributed on an
 *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 *  KIND, either express or implied.  See the License for the
 *  specific language governing permissions and limitations
 *  under the License.
 */

package org.apache.synapse.mediators.eip.splitter;

import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
import org.apache.axis2.context.OperationContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.synapse.ContinuationState;
import org.apache.synapse.ManagedLifecycle;
import org.apache.synapse.Mediator;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseLog;
import org.apache.synapse.aspects.AspectConfiguration;
import org.apache.synapse.aspects.ComponentType;
import org.apache.synapse.aspects.flow.statistics.StatisticIdentityGenerator;
import org.apache.synapse.aspects.flow.statistics.collectors.OpenEventCollector;
import org.apache.synapse.aspects.flow.statistics.collectors.RuntimeStatisticCollector;
import org.apache.synapse.aspects.flow.statistics.data.artifact.ArtifactHolder;
import org.apache.synapse.continuation.ContinuationStackManager;
import org.apache.synapse.continuation.ReliantContinuationState;
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.apache.synapse.endpoints.Endpoint;
import org.apache.synapse.mediators.AbstractMediator;
import org.apache.synapse.mediators.FlowContinuableMediator;
import org.apache.synapse.mediators.Value;
import org.apache.synapse.mediators.base.SequenceMediator;
import org.apache.synapse.mediators.eip.SharedDataHolder;
import org.apache.synapse.mediators.eip.EIPConstants;
import org.apache.synapse.mediators.eip.Target;
import org.apache.synapse.util.MessageHelper;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
 * This mediator will clone the message into multiple messages and mediate as specified in the
 * target elements. A target specifies or refers to a sequence or an endpoint, and optionally
 * specifies an Action and/or To address to be set to the cloned message. The number of cloned
 * messages created is the number of targets specified
 */
public class CloneMediator extends AbstractMediator implements ManagedLifecycle,
                                                               FlowContinuableMediator {

    /**
     * Continue processing the parent message or not?
     * (i.e. message which is subjected to cloning)
     */
    private boolean continueParent = false;

    /** the list of targets to which cloned copies of the message will be given for mediation */
    private List<Target> targets = new ArrayList<Target>();

    private String id = null;

    private boolean sequential = false;

    /** Reference to the synapse environment */
    private SynapseEnvironment synapseEnv;

    private String iterations;

    private Value dynamicIterationsValue;

    private static final String ITERATION_INDEX_PROPERTY_NAME = "CLONED_ITERATION_INDEX";

    private static final String STOP_FLOW_ON_FAILURE_PROPERTY_NAME = "STOP_TARGET_EXECUTION_ON_FAILURE";

    /**
     * This will implement the mediate method of the Mediator interface and will provide the
     * functionality of cloning message into the specified targets and mediation
     *
     * @param synCtx - MessageContext which is subjected to the cloning
     * @return boolean true if this needs to be further mediated (continueParent=true)
     */
    public boolean mediate(MessageContext synCtx) {

        if (synCtx.getEnvironment().isDebuggerEnabled()) {
            if (super.divertMediationRoute(synCtx)) {
                return true;
            }
        }

        SynapseLog synLog = getLog(synCtx);

        if (synLog.isTraceOrDebugEnabled()) {
            synLog.traceOrDebug("Start : Clone mediator");

            if (synLog.isTraceTraceEnabled()) {
                synLog.traceTrace("Message : " + synCtx.getEnvelope());
            }
        }

        synCtx.setProperty(id != null ? EIPConstants.EIP_SHARED_DATA_HOLDER + "." + id :
                           EIPConstants.EIP_SHARED_DATA_HOLDER, new SharedDataHolder());
        if (!StringUtils.isEmpty(iterations)) {
            // get the first target, clone the message for the number of iterations and then
            // mediate the cloned messages in the target for the number of iterations
            executeTargetIterations(synCtx);
        } else {

            // get the targets list, clone the message for the number of targets and then
            // mediate the cloned messages using the targets
            Iterator<Target> iter = targets.iterator();
            int i = 0;
            boolean isStopFlowOnFailure = "true".equalsIgnoreCase((String)
                    synCtx.getProperty(STOP_FLOW_ON_FAILURE_PROPERTY_NAME));
            while (iter.hasNext()) {
                if (synLog.isTraceOrDebugEnabled()) {
                    synLog.traceOrDebug("Submitting " + (i + 1) + " of " + targets.size() +
                            " messages for " + (isSequential() ? "sequential processing" : "parallel processing"));
                }

                MessageContext clonedMsgCtx = getClonedMessageContext(synCtx, i++, targets.size());
                ContinuationStackManager.addReliantContinuationState(clonedMsgCtx, i - 1,
                        getMediatorPosition());
                boolean isSuccess = iter.next().mediate(clonedMsgCtx);
                if (!isSuccess && sequential && isStopFlowOnFailure) {
                    break;
                }
            }
        }

        // if the continuation of the parent message is stopped from here set the RESPONSE_WRITTEN
        // property to SKIP to skip the blank http response 
        OperationContext opCtx
            = ((Axis2MessageContext) synCtx).getAxis2MessageContext().getOperationContext();
        if (!continueParent && opCtx != null) {
            opCtx.setProperty(Constants.RESPONSE_WRITTEN, "SKIP");
        }

        // finalize tracing and debugging
        synLog.traceOrDebug("End : Clone mediator");

        // if continue parent is true mediators after the clone will be called for the further
        // mediation of the message which is subjected for clonning (parent message)
        return continueParent;
    }

    private void executeTargetIterations(MessageContext synCtx) {
        int noOfIterations = resolveIterationsCount(synCtx);
        SynapseLog synLog = getLog(synCtx);
        Target target = targets.get(0);
        boolean isStopFlowOnFailure = "true".equalsIgnoreCase((String)
                synCtx.getProperty(STOP_FLOW_ON_FAILURE_PROPERTY_NAME));
        for (int i = 0; i < noOfIterations; ++i) {
            if (synLog.isTraceOrDebugEnabled()) {
                synLog.traceOrDebug("Submitting " + (i + 1) + " of " + noOfIterations +
                        " messages for " + (isSequential() ? "sequential processing" : "parallel processing"));
            }
            synCtx.setProperty(ITERATION_INDEX_PROPERTY_NAME, i + 1);
            MessageContext clonedMsgCtx = getClonedMessageContext(synCtx, i, noOfIterations);
            ContinuationStackManager.addReliantContinuationState(clonedMsgCtx, i - 1, getMediatorPosition());
            boolean isSuccess = target.mediate(clonedMsgCtx);
            if (!isSuccess && sequential && isStopFlowOnFailure) {
                break;
            }
        }
    }

    private int resolveIterationsCount(MessageContext synCtx) {
        String countStr = "";
        if (getDynamicIterationsValue() != null) {
            countStr = getDynamicIterationsValue().evaluateValue(synCtx);
        } else {
            countStr = getIterations();
        }
        try {
            return Integer.parseInt(countStr.trim());
        } catch (NumberFormatException e) {
            handleException("Error while parsing iterations number in clone mediator", synCtx);
        }
        return 0;
    }

    public boolean mediate(MessageContext synCtx,
                           ContinuationState continuationState) {
        SynapseLog synLog = getLog(synCtx);

        if (synLog.isTraceOrDebugEnabled()) {
            synLog.traceOrDebug("Clone mediator : Mediating from ContinuationState");
        }

        boolean result;
        int subBranch = ((ReliantContinuationState) continuationState).getSubBranch();

        SequenceMediator branchSequence = targets.get(subBranch).getSequence();
        boolean isStatisticsEnabled = RuntimeStatisticCollector.isStatisticsEnabled();
        if (!continuationState.hasChild()) {
            result = branchSequence.mediate(synCtx, continuationState.getPosition() + 1);
        } else {
            FlowContinuableMediator mediator =
                    (FlowContinuableMediator) branchSequence.getChild(continuationState.getPosition());

            result = mediator.mediate(synCtx, continuationState.getChildContState());

            if (isStatisticsEnabled) {
                ((Mediator) mediator).reportCloseStatistics(synCtx, null);
            }
        }
        if (isStatisticsEnabled) {
            branchSequence.reportCloseStatistics(synCtx, null);
        }
        return result;
    }

    /**
     * clone the provided message context as a new message, and mark as the messageSequence'th
     * message context of a total of messageCount messages
     *
     * @param synCtx          - MessageContext which is subjected to the cloning
     * @param messageSequence - the position of this message of the cloned set
     * @param messageCount    - total of cloned copies
     *
     * @return MessageContext the cloned message context
     */
    private MessageContext getClonedMessageContext(MessageContext synCtx, int messageSequence,
                                                   int messageCount) {

        MessageContext newCtx = null;
        try {
        	
            newCtx = MessageHelper.cloneMessageContext(synCtx);
            
            // Set isServerSide property in the cloned message context
            ((Axis2MessageContext) newCtx).getAxis2MessageContext().setServerSide(
                    ((Axis2MessageContext) synCtx).getAxis2MessageContext().isServerSide());

            if (id != null) {
                // set the parent correlation details to the cloned MC -
                //                              for the use of aggregation like tasks
                newCtx.setProperty(EIPConstants.AGGREGATE_CORRELATION + "." + id,
                        synCtx.getMessageID());
                // set the property MESSAGE_SEQUENCE to the MC for aggregation purposes
                newCtx.setProperty(EIPConstants.MESSAGE_SEQUENCE + "." + id,
                        String.valueOf(messageSequence) + EIPConstants.MESSAGE_SEQUENCE_DELEMITER +
                                messageCount);
            } else {
                newCtx.setProperty(EIPConstants.MESSAGE_SEQUENCE,
                        String.valueOf(messageSequence) + EIPConstants.MESSAGE_SEQUENCE_DELEMITER +
                                messageCount);
            }
        } catch (AxisFault axisFault) {
            handleException("Error cloning the message context", axisFault, synCtx);
        }

        return newCtx;
    }

    ///////////////////////////////////////////////////////////////////////////////////////
    //                        Getters and Setters                                        //
    ///////////////////////////////////////////////////////////////////////////////////////

    public boolean isContinueParent() {
        return continueParent;
    }

    public void setContinueParent(boolean continueParent) {
        this.continueParent = continueParent;
    }

    public String getIterations() {
        return iterations;
    }

    public void setIterations(String iterations) {
        this.iterations = iterations;
    }

    public Value getDynamicIterationsValue() {
        return dynamicIterationsValue;
    }

    public void setDynamicIterationsValue(Value dynamicIterationsValue) {
        this.dynamicIterationsValue = dynamicIterationsValue;
    }

    public List<Target> getTargets() {
        return targets;
    }

    public void setTargets(List<Target> targets) {
        this.targets = targets;
    }

    public void addTarget(Target target) {
        this.targets.add(target);
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public boolean isSequential() {
        return sequential;
    }

    public void setSequential(boolean sequential) {
        this.sequential = sequential;
    }

    @Override
    public boolean isContentAltering() {
        return true;
    }

    public void init(SynapseEnvironment se) {

        synapseEnv = se;
        for (Target target : targets) {
            ManagedLifecycle seq = target.getSequence();
            if (seq != null) {
                seq.init(se);
            } else if (target.getSequenceRef() != null) {
                SequenceMediator targetSequence =
                        (SequenceMediator) se.getSynapseConfiguration().
                                getSequence(target.getSequenceRef());

                if (targetSequence == null || targetSequence.isDynamic()) {
                    se.addUnavailableArtifactRef(target.getSequenceRef());
                }
            }
            Endpoint endpoint = target.getEndpoint();
            if (endpoint != null) {
                endpoint.init(se);
            }
        }
    }

    public void destroy() {

        for (Target target : targets) {
            ManagedLifecycle seq = target.getSequence();
            if (seq != null) {
                seq.destroy();
            } else if (target.getSequenceRef() != null) {
                SequenceMediator targetSequence =
                        (SequenceMediator) synapseEnv.getSynapseConfiguration().
                                getSequence(target.getSequenceRef());

                if (targetSequence == null || targetSequence.isDynamic()) {
                    synapseEnv.removeUnavailableArtifactRef(target.getSequenceRef());
                }
            }
            Endpoint endpoint = target.getEndpoint();
            if (endpoint != null) {
                endpoint.destroy();
            }
        }
    }

    @Override
    public Integer reportOpenStatistics(MessageContext messageContext, boolean isContentAltering) {
        return OpenEventCollector.reportFlowSplittingEvent(messageContext, getMediatorName(), ComponentType.MEDIATOR,
                                                           getAspectConfiguration(),
                                                           isContentAltering() || isContentAltering);
    }

    @Override
    public void setComponentStatisticsId(ArtifactHolder holder) {
        if (getAspectConfiguration() == null) {
            configure(new AspectConfiguration(getMediatorName()));
        }
        String sequenceId =
                StatisticIdentityGenerator.getIdForFlowContinuableMediator(getMediatorName(), ComponentType.MEDIATOR, holder);
        getAspectConfiguration().setUniqueId(sequenceId);
        for(Target target: targets){
            target.setStatisticIdForMediators(holder);
        }

        StatisticIdentityGenerator.reportingFlowContinuableEndEvent(sequenceId, ComponentType.MEDIATOR, holder);
    }
}