HttpTargetResponseWorker.java

/*
 *  Copyright (c) 2022, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
 *
 *  WSO2 Inc. licenses this file to you under the Apache License,
 *  Version 2.0 (the "License"); you may not use this file except
 *  in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing,
 *  software distributed under the License is distributed on an
 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 *  KIND, either express or implied.  See the License for the
 *  specific language governing permissions and limitations
 *  under the License.
 *
 */
package org.apache.synapse.transport.netty.sender;

import io.netty.handler.codec.http.HttpHeaders;
import org.apache.axiom.om.OMAbstractFactory;
import org.apache.axiom.soap.impl.llom.soap11.SOAP11Factory;
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.builder.BuilderUtil;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.description.Parameter;
import org.apache.axis2.description.WSDL2Constants;
import org.apache.axis2.engine.AxisEngine;
import org.apache.axis2.engine.MessageReceiver;
import org.apache.axis2.util.JavaUtils;
import org.apache.axis2.wsdl.WSDLConstants;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpStatus;
import org.apache.http.protocol.HTTP;
import org.apache.synapse.transport.netty.BridgeConstants;
import org.apache.synapse.transport.netty.config.TargetConfiguration;
import org.apache.synapse.transport.netty.util.HttpUtils;
import org.apache.synapse.transport.nhttp.NhttpConstants;
import org.apache.synapse.transport.passthru.PassThroughConstants;
import org.wso2.transport.http.netty.message.HttpCarbonMessage;

import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
 * {@code HttpTargetResponseWorker} is the Thread which does the response processing.
 */
public class HttpTargetResponseWorker implements Runnable {

    private static final Log LOG = LogFactory.getLog(HttpTargetResponseWorker.class);

    private final HttpCarbonMessage httpResponse;
    private final MessageContext requestMsgCtx;
    private final TargetConfiguration targetConfiguration;

    HttpTargetResponseWorker(MessageContext requestMsgCtx, HttpCarbonMessage httpResponse,
                             TargetConfiguration targetConfiguration) {

        this.httpResponse = httpResponse;
        this.requestMsgCtx = requestMsgCtx;
        this.targetConfiguration = targetConfiguration;
    }

    @Override
    public void run() {

        if (handleResponseFlow(httpResponse.getHttpStatusCode())) {
            return;
        }

        MessageContext responseMsgCtx;
        try {
            responseMsgCtx = createResponseMessageContext();
        } catch (AxisFault ex) {
            return;
        }

        handleLocationHeader(responseMsgCtx);

        try {
            populateProperties(responseMsgCtx);
        } catch (AxisFault e) {
            LOG.error("Error occurred while setting the SOAP envelope to the response message context", e);
            cleanup();
            return;
        }

        try {
            // Handover message to the axis engine for processing
            AxisEngine.receive(responseMsgCtx);
        } catch (AxisFault ex) {
            LOG.error("Error occurred while processing response message through Axis2", ex);
            String errorMessage = "Fault processing response message through Axis2: " + ex.getMessage();
            responseMsgCtx.setProperty(
                    NhttpConstants.SENDING_FAULT, Boolean.TRUE);
            responseMsgCtx.setProperty(
                    NhttpConstants.ERROR_CODE, NhttpConstants.RESPONSE_PROCESSING_FAILURE);
            responseMsgCtx.setProperty(
                    NhttpConstants.ERROR_MESSAGE, errorMessage.split("\n")[0]);
            responseMsgCtx.setProperty(
                    NhttpConstants.ERROR_DETAIL, JavaUtils.stackToString(ex));
            responseMsgCtx.setProperty(
                    NhttpConstants.ERROR_EXCEPTION, ex);
            try {
                responseMsgCtx.getAxisOperation().getMessageReceiver().receive(responseMsgCtx);
            } catch (AxisFault axisFault) {
                LOG.error("Error occurred while processing fault response message through Axis2", ex);
            }
        } finally {
            cleanup();
        }
    }

    private MessageContext createResponseMessageContext() throws AxisFault {

        MessageContext responseMsgCtx;
        try {
            responseMsgCtx = requestMsgCtx.getOperationContext().getMessageContext(WSDL2Constants.MESSAGE_LABEL_IN);
        } catch (AxisFault ex) {
            LOG.error("Error getting the response message context from the operation context", ex);
            throw ex;
        }

        if (Objects.isNull(responseMsgCtx)) {
            if (requestMsgCtx.getOperationContext().isComplete()) {
                String msg = "Error getting IN message context from the operation context. "
                        + "Possibly an RM terminate sequence message";
                if (LOG.isDebugEnabled()) {
                    LOG.debug(msg);
                }
                throw new AxisFault(msg);
            }
            responseMsgCtx = new MessageContext();
            responseMsgCtx.setOperationContext(requestMsgCtx.getOperationContext());
        } else {
            // fix for RM to work because of a soapAction and wsaAction conflict
            responseMsgCtx.setSoapAction("");
        }
        return responseMsgCtx;
    }

    private boolean handleResponseFlow(int statusCode) {

        if (isStatusCode1xx(statusCode)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Received a " + statusCode + " informational response.");
            }
            return true;
        }

        try {
            if (isStatusCode202(statusCode) && handle202(requestMsgCtx)) {
                return true;
            }
        } catch (AxisFault ex) {
            LOG.error("Error while handling the 202 response. ", ex);
            cleanup();
            return true;
        }
        return false;
    }

    /**
     * Populates required properties in the message context.
     *
     * @throws AxisFault if an error occurs while setting the SOAP envelope
     */
    private void populateProperties(MessageContext responseMsgCtx) throws AxisFault {

        responseMsgCtx.setServerSide(true);
        responseMsgCtx.setDoingREST(requestMsgCtx.isDoingREST());
        responseMsgCtx.setTransportIn(requestMsgCtx.getTransportIn());
        responseMsgCtx.setTransportOut(requestMsgCtx.getTransportOut());
        responseMsgCtx.setAxisMessage(requestMsgCtx.getOperationContext().getAxisOperation().
                getMessage(WSDLConstants.MESSAGE_LABEL_IN_VALUE));
        responseMsgCtx.setOperationContext(requestMsgCtx.getOperationContext());
        responseMsgCtx.setConfigurationContext(requestMsgCtx.getConfigurationContext());
        responseMsgCtx.setTo(null);

        // Set status code and reason phrase
        int statusCode = httpResponse.getHttpStatusCode();
        responseMsgCtx.setProperty(BridgeConstants.HTTP_SC, statusCode);
        responseMsgCtx.setProperty(BridgeConstants.HTTP_SC_DESC, httpResponse.getReasonPhrase());
        responseMsgCtx.setProperty(BridgeConstants.HTTP_STATUS_CODE_SENT_FROM_BACKEND, statusCode);
        responseMsgCtx.setProperty(BridgeConstants.HTTP_REASON_PHRASE_SENT_FROM_BACKEND,
                httpResponse.getReasonPhrase());

        // Set rest of the properties
        responseMsgCtx.setProperty(MessageContext.TRANSPORT_IN,
                requestMsgCtx.getProperty(MessageContext.TRANSPORT_IN));
        responseMsgCtx.setProperty(BridgeConstants.INVOKED_REST, requestMsgCtx.isDoingREST());
        responseMsgCtx.setProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_IN_MESSAGES,
                requestMsgCtx.getProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_IN_MESSAGES));
        responseMsgCtx.setProperty(BridgeConstants.NON_BLOCKING_TRANSPORT, true);
        responseMsgCtx.setProperty(BridgeConstants.HTTP_CARBON_MESSAGE, httpResponse);
        responseMsgCtx.setProperty(BridgeConstants.HTTP_CLIENT_REQUEST_CARBON_MESSAGE,
                requestMsgCtx.getProperty(BridgeConstants.HTTP_CLIENT_REQUEST_CARBON_MESSAGE));
        responseMsgCtx.setProperty(BridgeConstants.HTTP_SOURCE_CONFIGURATION,
                requestMsgCtx.getProperty(BridgeConstants.HTTP_SOURCE_CONFIGURATION));

        // Set any transport headers received
        Map<String, String> headers = new HashMap<>();
        httpResponse.getHeaders().forEach(entry -> headers.put(entry.getKey(), entry.getValue()));
        responseMsgCtx.setProperty(MessageContext.TRANSPORT_HEADERS, headers);

        if (isStatusCode202(statusCode)) {
            responseMsgCtx.setProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_OUT_MESSAGES, Boolean.TRUE);
            responseMsgCtx.setProperty(BridgeConstants.MESSAGE_BUILDER_INVOKED, Boolean.FALSE);
            responseMsgCtx.setProperty(BridgeConstants.SC_ACCEPTED, Boolean.TRUE);
        } else if (isErrorResponse(statusCode)) {
            responseMsgCtx.setProperty(BridgeConstants.FAULT_MESSAGE, BridgeConstants.TRUE);
        }

        if (isResponseExpectingBody(requestMsgCtx, statusCode)) {
            setContentTypeAndCharSetEncoding(responseMsgCtx);
            responseMsgCtx.setEnvelope(OMAbstractFactory.getSOAP11Factory().getDefaultEnvelope());
        } else {
            // there is no response entity-body
            responseMsgCtx.setProperty(PassThroughConstants.NO_ENTITY_BODY, Boolean.TRUE);
            responseMsgCtx.setEnvelope(new SOAP11Factory().getDefaultEnvelope());
        }
    }

    private void setContentTypeAndCharSetEncoding(MessageContext responseMsgCtx) {

        String contentType = httpResponse.getHeader(BridgeConstants.CONTENT_TYPE_HEADER);
        if (contentType == null) {
            // Server hasn't sent the header - Try to infer the content type
            contentType = inferContentType(responseMsgCtx);
        }

        if (contentType != null) {
            responseMsgCtx.setProperty(Constants.Configuration.CONTENT_TYPE, contentType);
            String charSetEncoding = BuilderUtil.getCharSetEncoding(contentType);
            responseMsgCtx.setProperty(Constants.Configuration.CHARACTER_SET_ENCODING,
                    contentType.indexOf("charset") > 0
                            ? charSetEncoding : MessageContext.DEFAULT_CHAR_SET_ENCODING);
        }
    }

    private boolean isResponseExpectingBody(MessageContext requestMsgCtx, int statusCode) {

        // According to RFC 7230 - HTTP/1.1 Message Syntax and Routing - Message Body Length, the following logic
        // was implemented.
        if (HttpUtils.isHEADRequest(requestMsgCtx)) {
            // Any response to a HEAD request
            return false;
        } else if (HttpUtils.isCONNECTRequest(requestMsgCtx)) {
            // Any 2xx (Successful) response to a CONNECT request
            return (statusCode / 100 != 2);
        }

        // Any response with a 1xx (Informational), 204 (No Content), or 304 (Not Modified) status code
        return statusCode >= HttpStatus.SC_OK
                && statusCode != HttpStatus.SC_NO_CONTENT
                && statusCode != HttpStatus.SC_NOT_MODIFIED
                && statusCode != HttpStatus.SC_RESET_CONTENT;
    }

    private String inferContentType(MessageContext responseMsgCtx) {

        // When the response from backend does not have a body(Content-Length is 0 )
        // and Content-Type is not set; ESB should not do any modification to the response and pass-through as it is.
        HttpHeaders headers = httpResponse.getHeaders();
        if (!checkIfResponseHaveBodyBasedOnContentLenAndTransferEncodingHeaders(headers, responseMsgCtx)) {
            return null;
        }

        // Try to get the content type from the axis configuration
        Parameter cTypeParam = requestMsgCtx.getConfigurationContext().getAxisConfiguration().getParameter(
                PassThroughConstants.CONTENT_TYPE);
        if (cTypeParam != null) {
            return cTypeParam.getValue().toString();
        }

        // If unable to determine the content type - Return application/octet-stream as the default value
        return PassThroughConstants.DEFAULT_CONTENT_TYPE;
    }

    private boolean checkIfResponseHaveBodyBasedOnContentLenAndTransferEncodingHeaders(HttpHeaders headers,
                                                                                       MessageContext responseMsgCtx) {

        String contentLengthHeader = headers.get(HTTP.CONTENT_LEN);
        boolean contentLengthHeaderPresent = contentLengthHeader != null;
        boolean transferEncodingHeaderPresent = headers.get(HTTP.TRANSFER_ENCODING) != null;

        if ((!contentLengthHeaderPresent && !transferEncodingHeaderPresent)
                || "0".equals(contentLengthHeader)) {
            responseMsgCtx.setProperty(PassThroughConstants.NO_ENTITY_BODY, Boolean.TRUE);
            return false;
        }
        return true;
    }

    private boolean handle202(MessageContext requestMsgContext) throws AxisFault {

        if (requestMsgContext.isPropertyTrue(BridgeConstants.IGNORE_SC_ACCEPTED)) {
            // We should not further process this 202 response - Ignore it
            return true;
        }

        MessageReceiver mr = requestMsgContext.getAxisOperation().getMessageReceiver();
        MessageContext responseMsgContext = requestMsgCtx.getOperationContext()
                .getMessageContext(WSDL2Constants.MESSAGE_LABEL_IN);
        if (responseMsgContext == null || requestMsgContext.getOptions().isUseSeparateListener()) {
            // Most probably a response from a dual channel invocation
            // Inject directly into the SynapseCallbackReceiver
            requestMsgContext.setProperty(BridgeConstants.HTTP_202_RECEIVED, "true");
            mr.receive(requestMsgContext);
            return true;
        }
        return false;
    }

    private void handleLocationHeader(MessageContext responseMsgCtx) {
        // Special casing 301, 302, 303 and 307 scenario in following section. Not sure whether it's the correct fix,
        // but this fix makes it possible to do http --> https redirection.
        int statusCode = httpResponse.getHttpStatusCode();
        String originalURL = httpResponse.getHeader(BridgeConstants.LOCATION);

        if (originalURL != null && shouldRewriteLocationHeader(statusCode)) {
            URL url;
            String urlContext;
            try {
                url = new URL(originalURL);
                urlContext = url.getFile();
            } catch (MalformedURLException e) {
                //Fix ESBJAVA-3461 - In the case when relative path is sent should be handled
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Relative URL received for Location : " + originalURL, e);
                }
                urlContext = originalURL;
            }

            httpResponse.removeHeader(BridgeConstants.LOCATION);
            String servicePrefix = (String) requestMsgCtx.getProperty(BridgeConstants.SERVICE_PREFIX);
            if (servicePrefix != null) {
                if (urlContext == null) {
                    urlContext = "";
                } else if (urlContext.startsWith("/")) {
                    //Remove the preceding '/' character
                    urlContext = urlContext.substring(1);
                }
                httpResponse.setHeader(BridgeConstants.LOCATION, servicePrefix + urlContext);
            }
        }
        responseMsgCtx.setProperty(BridgeConstants.PRE_LOCATION_HEADER, originalURL);
    }

    private boolean shouldRewriteLocationHeader(int statusCode) {

        return !targetConfiguration.isPreserveHttpHeader(BridgeConstants.LOCATION)
                && !isStatusCode3xxRedirection(statusCode)
                && !isStatusCode201(statusCode);
    }

    private boolean isStatusCode1xx(int statusCode) {

        return statusCode / 100 == 1;
    }

    private boolean isStatusCode202(int statusCode) {

        return statusCode == HttpStatus.SC_ACCEPTED;
    }

    private boolean isStatusCode3xxRedirection(int statusCode) {

        return statusCode == HttpStatus.SC_MOVED_TEMPORARILY || statusCode == HttpStatus.SC_MOVED_PERMANENTLY
                || statusCode == HttpStatus.SC_SEE_OTHER || statusCode == HttpStatus.SC_TEMPORARY_REDIRECT;
    }

    private boolean isStatusCode201(int statusCode) {

        return statusCode == HttpStatus.SC_CREATED;
    }

    private boolean isErrorResponse(int statusCode) {

        return statusCode >= 400;
    }

    /**
     * Perform cleanup of ClientWorker.
     */
    private void cleanup() {
        //clean threadLocal variables
        MessageContext.destroyCurrentMessageContext();
    }
}