InboundHttpServerWorker.java

/*
 * Copyright (c) 2005-2014, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
 *
 * WSO2 Inc. licenses this file to you under the Apache License,
 * Version 2.0 (the "License"); you may not use this file except
 * in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.wso2.carbon.inbound.endpoint.protocol.http;

import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axis2.AxisFault;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.OperationContext;
import org.apache.axis2.context.ServiceContext;
import org.apache.axis2.description.AxisOperation;
import org.apache.axis2.description.InOutAxisOperation;
import org.apache.axis2.util.Utils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.protocol.HTTP;
import org.apache.synapse.SynapseException;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.apache.synapse.core.axis2.MessageContextCreatorForAxis2;
import org.apache.synapse.core.axis2.ResponseState;
import org.apache.synapse.core.axis2.SynapseMessageReceiver;
import org.apache.synapse.inbound.InboundEndpoint;
import org.apache.synapse.inbound.InboundEndpointConstants;
import org.apache.synapse.mediators.MediatorFaultHandler;
import org.apache.synapse.mediators.base.SequenceMediator;
import org.apache.synapse.rest.RESTRequestHandler;
import org.apache.synapse.transport.customlogsetter.CustomLogSetter;
import org.apache.synapse.transport.passthru.ServerWorker;
import org.apache.synapse.transport.passthru.SourceRequest;
import org.apache.synapse.transport.passthru.config.SourceConfiguration;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.core.multitenancy.utils.TenantAxisUtils;
import org.wso2.carbon.inbound.endpoint.protocol.http.management.HTTPEndpointManager;
import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
import org.wso2.carbon.utils.multitenancy.MultitenantUtils;

import java.io.OutputStream;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * Create SynapseMessageContext from HTTP Request and inject it to the sequence in a synchronous manner
 * This is the worker for HTTP inbound related requests.
 */
public class InboundHttpServerWorker extends ServerWorker {

    private static final Log log = LogFactory.getLog(InboundHttpServerWorker.class);

    private SourceRequest request = null;
    private int port;
    private String tenantDomain;
    private RESTRequestHandler restHandler;
    private Pattern dispatchPattern;
    private Matcher patternMatcher;

    public InboundHttpServerWorker(int port, String tenantDomain,
                                   SourceRequest sourceRequest,
                                   SourceConfiguration sourceConfiguration,
                                   OutputStream outputStream) {
        super(sourceRequest, sourceConfiguration, outputStream);
        this.request = sourceRequest;
        this.port = port;
        this.tenantDomain = tenantDomain;
        restHandler = new RESTRequestHandler();
    }

    public void run() {
        if (request != null) {
            try {
                //get already created axis2 context from ServerWorker
                MessageContext axis2MsgContext = getRequestContext();

                //create Synapse Message Context
                org.apache.synapse.MessageContext synCtx =
                        createSynapseMessageContext(request, axis2MsgContext);
                updateAxis2MessageContextForSynapse(synCtx);

                setInboundProperties(synCtx);
                // setting ResponseState for http inbound endpoint request
                synCtx.setProperty(SynapseConstants.RESPONSE_STATE, new ResponseState());
                String method = request.getRequest() != null ? request.getRequest().
                        getRequestLine().getMethod().toUpperCase() : "";
                processHttpRequestUri(axis2MsgContext, method);

                String endpointName =
                        HTTPEndpointManager.getInstance().getEndpointName(port, tenantDomain);
                if (endpointName == null) {
                    handleException("Endpoint not found for port : " + port + "" +
                                    " tenant domain : " + tenantDomain);
                }
                InboundEndpoint endpoint = synCtx.getConfiguration().getInboundEndpoint(endpointName);

                if (endpoint == null) {
                    log.error("Cannot find deployed inbound endpoint " + endpointName + "for process request");
                    return;
                }

//                OpenEventCollector.reportEntryEvent(synCtx, endpointName, endpoint.getAspectConfiguration(),
//                                                    ComponentType.INBOUNDENDPOINT);

                CustomLogSetter.getInstance().setLogAppender(endpoint.getArtifactContainerName());

                if (!isRESTRequest(axis2MsgContext, method)) {
                    if (request.isEntityEnclosing()) {
                        processEntityEnclosingRequest(axis2MsgContext, false);
                    } else {
                        processNonEntityEnclosingRESTHandler(null, axis2MsgContext, false);
                    }
                } else {
                    AxisOperation axisOperation = ((Axis2MessageContext) synCtx).getAxis2MessageContext().getAxisOperation();
                    ((Axis2MessageContext) synCtx).getAxis2MessageContext().setAxisOperation(null);
                    String contentTypeHeader = request.getHeaders().get(HTTP.CONTENT_TYPE);
                    SOAPEnvelope soapEnvelope = handleRESTUrlPost(contentTypeHeader);
                    processNonEntityEnclosingRESTHandler(soapEnvelope, axis2MsgContext, false);
                    ((Axis2MessageContext) synCtx).getAxis2MessageContext().setAxisOperation(axisOperation);

                }

                dispatchPattern = HTTPEndpointManager.getInstance().getPattern(tenantDomain, port);

                boolean continueDispatch = true;
                if (dispatchPattern != null) {
                    patternMatcher = dispatchPattern.matcher(request.getUri());
                    if (!patternMatcher.matches()) {
                        if (log.isDebugEnabled()) {
                            log.debug("Requested URI does not match given dispatch regular expression.");
                        }
                        continueDispatch = false;
                    }
                }

                if (continueDispatch && dispatchPattern != null) {

                    boolean processedByAPI = false;

                    // Trying to dispatch to an API
                    processedByAPI = restHandler.process(synCtx);
                    if (log.isDebugEnabled()) {
                        log.debug("Dispatch to API state : enabled, Message is "
                                  + (!processedByAPI ? "NOT" : "") + "processed by an API");
                    }

                    if (!processedByAPI) {
                        //check the validity of message routing to axis2 path
                        boolean isAxis2Path = isAllowedAxis2Path(synCtx);

                        if (isAxis2Path) {
                            //create axis2 message context again to avoid settings updated above
                            axis2MsgContext = createMessageContext(null, request);

                            processHttpRequestUri(axis2MsgContext, method);

                            //set inbound properties for axis2 context
                            setInboundProperties(axis2MsgContext);

                            if (!isRESTRequest(axis2MsgContext, method)) {
                                if (request.isEntityEnclosing()) {
                                    processEntityEnclosingRequest(axis2MsgContext, isAxis2Path);
                                } else {
                                    processNonEntityEnclosingRESTHandler(null, axis2MsgContext, isAxis2Path);
                                }
                            } else {
                                String contentTypeHeader = request.getHeaders().get(HTTP.CONTENT_TYPE);
                                SOAPEnvelope soapEnvelope = handleRESTUrlPost(contentTypeHeader);
                                processNonEntityEnclosingRESTHandler(soapEnvelope,axis2MsgContext,true);
                            }
                        } else {
                            //this case can only happen regex exists and it DOES match
                            //BUT there is no api or proxy found message to be injected
                            //should be routed to the main sequence instead inbound defined sequence
                            injectToMainSequence(synCtx, endpoint);
                        }
                    }
                } else if (continueDispatch && dispatchPattern == null) {
                    // else if for clarity compiler will optimize
                    injectToSequence(synCtx, endpoint);
                } else {
                    //this case can only happen regex exists and it DOES NOT match
                    //should be routed to the main sequence instead inbound defined sequence
                    injectToMainSequence(synCtx, endpoint);
                }

                SynapseMessageReceiver.doPostInjectUpdates(synCtx);
                // send ack for client if needed
                sendAck(axis2MsgContext);
            } catch (Exception e) {
                log.error("Exception occurred when running " + InboundHttpServerWorker.class.getName(), e);
            }
        } else {
            log.error("InboundSourceRequest cannot be null");
        }
    }

    private void injectToMainSequence(org.apache.synapse.MessageContext synCtx,
                                      InboundEndpoint endpoint) {

        SequenceMediator injectingSequence = (SequenceMediator) synCtx.getMainSequence();

        SequenceMediator faultSequence = getFaultSequence(synCtx, endpoint);

        MediatorFaultHandler mediatorFaultHandler = new MediatorFaultHandler(faultSequence);
        synCtx.pushFaultHandler(mediatorFaultHandler);

        /* handover synapse message context to synapse environment for inject it to given
        sequence in synchronous manner*/
        if (log.isDebugEnabled()) {
            log.debug("injecting message to sequence : " + endpoint.getInjectingSeq());
        }
        synCtx.getEnvironment().injectMessage(synCtx, injectingSequence);
    }

    private void injectToSequence(org.apache.synapse.MessageContext synCtx, InboundEndpoint endpoint) {
        // Get injecting sequence for synapse engine
        SequenceMediator injectingSequence = null;
        if (endpoint.getInjectingSeq() != null) {

            injectingSequence = (SequenceMediator) synCtx.getSequence(endpoint.getInjectingSeq());
        }

        if (injectingSequence == null) {
            injectingSequence = (SequenceMediator) synCtx.getMainSequence();
        }

        SequenceMediator faultSequence = getFaultSequence(synCtx, endpoint);

        MediatorFaultHandler mediatorFaultHandler = new MediatorFaultHandler(faultSequence);
        synCtx.pushFaultHandler(mediatorFaultHandler);

        /* handover synapse message context to synapse environment for inject it to given sequence in
        synchronous manner*/
        if (log.isDebugEnabled()) {
            log.debug("injecting message to sequence : " + endpoint.getInjectingSeq());
        }
        synCtx.setProperty("inbound.endpoint.name", endpoint.getName());
        synCtx.getEnvironment().injectMessage(synCtx, injectingSequence);
    }

    private SequenceMediator getFaultSequence(org.apache.synapse.MessageContext synCtx, InboundEndpoint endpoint) {
        SequenceMediator faultSequence = null;
        if (endpoint.getOnErrorSeq() != null) {
            faultSequence = (SequenceMediator) synCtx.getSequence(endpoint.getOnErrorSeq());
        }

        if (faultSequence == null) {
            faultSequence = (SequenceMediator) synCtx.getFaultSequence();
        }

        return faultSequence;
    }

    /**
     * Set Inbound Related Properties for Synapse Message Context
     *
     * @param msgContext Synapse Message Context of incoming request
     */
    private void setInboundProperties(org.apache.synapse.MessageContext msgContext) {
        msgContext.setProperty(SynapseConstants.IS_INBOUND, true);
        msgContext.setProperty(InboundEndpointConstants.INBOUND_ENDPOINT_RESPONSE_WORKER,
                               new InboundHttpResponseSender());
        msgContext.setWSAAction(request.getHeaders().get(InboundHttpConstants.SOAP_ACTION));
    }

    /**
     * Set Inbound Related Properties for Axis2 Message Context
     *
     * @param axis2Context Axis2 Message Context of incoming request
     */
    private void setInboundProperties(MessageContext axis2Context) {
        axis2Context.setProperty(SynapseConstants.IS_INBOUND, true);
    }

    protected void handleException(String msg) {
        log.error(msg);
        throw new SynapseException(msg);
    }

    /**
     * Checks whether the message should be routed to Axis2 path
     *
     * @param synapseMsgContext Synapse Message Context of incoming message
     * @return true if the message should be routed, false otherwise
     */
    private boolean isAllowedAxis2Path(org.apache.synapse.MessageContext synapseMsgContext) {
        boolean isProxy = false;

        String reqUri = request.getUri();
        String tenant = MultitenantUtils.getTenantDomainFromUrl(request.getUri());
        String servicePath = getSourceConfiguration().getConfigurationContext().getServicePath();

        //for tenants, service path will be appended by tenant name
        if (!reqUri.equalsIgnoreCase(tenant)) {
            servicePath = servicePath + "/t/" + tenant;
        }

        //Get the operation part from the request URL
        // e.g. '/services/TestProxy/' > TestProxy when service path is '/service/' > result 'TestProxy/'
        String serviceOpPart = Utils.getServiceAndOperationPart(reqUri,
                                                                servicePath);
        //if proxy, then check whether it is deployed in the environment
        if (serviceOpPart != null) {
            isProxy = isProxyDeployed(synapseMsgContext, serviceOpPart);
        } else {
            if (log.isDebugEnabled()) {
                log.debug("Requested Proxy Service '" + serviceOpPart + "' is not deployed");
            }
        }
        return isProxy;
    }

    /**
     * Checks whether the given proxy is deployed in synapse environment
     *
     * @param synapseContext   Synapse Message Context of incoming message
     * @param serviceOpPart String name of the service operation
     * @return true if the proxy is deployed, false otherwise
     */
    private boolean isProxyDeployed(org.apache.synapse.MessageContext synapseContext,
                                    String serviceOpPart) {
        boolean isDeployed = false;

        //extract proxy name from serviceOperation, get the first portion split by '/'
        String proxyName = serviceOpPart.split("/")[0];

        //check whether the proxy is deployed in synapse environment
        if (synapseContext.getConfiguration().getProxyService(proxyName) != null) {
            isDeployed = true;
        }
        return isDeployed;
    }

    /**
     * Creates synapse message context from axis2 context
     *
     * @param inboundSourceRequest Source Request of inbound
     * @param axis2Context         Axis2 message context of message
     * @return Synapse Message Context instance
     * @throws AxisFault
     */
    private org.apache.synapse.MessageContext createSynapseMessageContext(
            SourceRequest inboundSourceRequest, MessageContext axis2Context) throws AxisFault {

        // Create super tenant message context
        MessageContext axis2MsgCtx = axis2Context;

        // If not super tenant, assign tenant configuration context
        if (!tenantDomain.equals(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME)) {
            try {
                // We have to start the tenant flow inorder to get the tenant from the tenant cache.
                ConfigurationContext tenantConfigCtx = null;
                try {
                    PrivilegedCarbonContext.startTenantFlow();
                    PrivilegedCarbonContext privilegedCarbonContext = PrivilegedCarbonContext
                            .getThreadLocalCarbonContext();
                    privilegedCarbonContext.setTenantDomain(tenantDomain, true);

                    tenantConfigCtx = TenantAxisUtils.getTenantConfigurationContext(tenantDomain,
                            axis2MsgCtx.getConfigurationContext());
                } finally {
                    PrivilegedCarbonContext.endTenantFlow();
                }

                axis2MsgCtx.setConfigurationContext(tenantConfigCtx);
                axis2MsgCtx.setProperty(MultitenantConstants.TENANT_DOMAIN, tenantDomain);
            } catch (Exception e) {
                log.warn("Could not get tenant configuration context for tenant " + tenantDomain + ". " +
                        "Tenant may not exist. Message will be dispatched to super tenant.");
                tenantDomain = MultitenantConstants.SUPER_TENANT_DOMAIN_NAME;
            }
        }
        return MessageContextCreatorForAxis2.getSynapseMessageContext(axis2MsgCtx);
    }

    /**
     * Updates additional properties in Axis2 Message Context from Synapse Message Context
     *
     * @param synCtx Synapse Message Context
     * @return Updated Axis2 Message Context
     * @throws AxisFault
     */
    private org.apache.synapse.MessageContext updateAxis2MessageContextForSynapse(
               org.apache.synapse.MessageContext synCtx) throws AxisFault {

        ServiceContext svcCtx = new ServiceContext();
        OperationContext opCtx = new OperationContext(new InOutAxisOperation(), svcCtx);

        ((Axis2MessageContext) synCtx).getAxis2MessageContext().setServiceContext(svcCtx);
        ((Axis2MessageContext) synCtx).getAxis2MessageContext().setOperationContext(opCtx);

        return synCtx;
    }
}