HttpRequestWorker.java

/*
 *  Copyright (c) 2022, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
 *
 *  WSO2 Inc. licenses this file to you under the Apache License,
 *  Version 2.0 (the "License"); you may not use this file except
 *  in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing,
 *  software distributed under the License is distributed on an
 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 *  KIND, either express or implied.  See the License for the
 *  specific language governing permissions and limitations
 *  under the License.
 *
 */
package org.apache.synapse.transport.netty.listener;

import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import org.apache.axiom.om.OMAbstractFactory;
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axiom.soap.SOAPFactory;
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.builder.BuilderUtil;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.description.Parameter;
import org.apache.axis2.engine.AxisEngine;
import org.apache.axis2.transport.RequestResponseTransport;
import org.apache.axis2.transport.TransportUtils;
import org.apache.axis2.transport.http.HTTPConstants;
import org.apache.axis2.util.MessageContextBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpStatus;
import org.apache.http.protocol.HTTP;
import org.apache.synapse.commons.handlers.ConnectionId;
import org.apache.synapse.commons.handlers.HandlerResponse;
import org.apache.synapse.commons.handlers.MessageInfo;
import org.apache.synapse.commons.handlers.MessagingHandler;
import org.apache.synapse.commons.handlers.MessagingHandlerConstants;
import org.apache.synapse.commons.handlers.Protocol;
import org.apache.synapse.transport.netty.BridgeConstants;
import org.apache.synapse.transport.netty.config.SourceConfiguration;
import org.apache.synapse.transport.netty.util.HttpUtils;
import org.apache.synapse.transport.netty.util.RequestResponseUtils;
import org.apache.synapse.transport.nhttp.NhttpConstants;
import org.apache.synapse.transport.passthru.PassThroughConstants;
import org.wso2.transport.http.netty.contract.exceptions.ServerConnectorException;
import org.wso2.transport.http.netty.message.HttpCarbonMessage;
import org.wso2.transport.http.netty.message.HttpCarbonRequest;

import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Objects;

import static org.apache.synapse.transport.netty.BridgeConstants.CONTENT_TYPE_HEADER;
import static org.apache.synapse.transport.netty.BridgeConstants.SOAP_ACTION_HEADER;

/**
 * {@code HttpRequestWorker} is the Thread that does the request processing.
 */
public class HttpRequestWorker implements Runnable {

    private static final Log LOG = LogFactory.getLog(HttpRequestWorker.class);
    private final HttpCarbonMessage incomingCarbonMsg;
    private final MessageContext msgContext;
    private final ConfigurationContext configurationContext;
    private final SourceConfiguration sourceConfiguration;
    private boolean requestHasEntityBody;

    public HttpRequestWorker(HttpCarbonMessage incomingCarbonMsg, SourceConfiguration sourceConfiguration) {

        this.sourceConfiguration = sourceConfiguration;
        this.incomingCarbonMsg = incomingCarbonMsg;
        this.configurationContext = sourceConfiguration.getConfigurationContext();
        this.msgContext = RequestResponseUtils.convertCarbonMsgToAxis2MsgCtx(incomingCarbonMsg, sourceConfiguration);
    }

    @Override
    public void run() {

        // first, get the URI of the underlying HttpCarbonMessage and generate the service prefix
        // and add to the message context.
        processHttpRequestUri();

        // check if the request is to fetch wsdl. If so, return the message flow without going through the normal flow.
        if (isRequestToFetchWSDL()) {
            return;
        }

        try {
            populateProperties();

            // invoke MessagingHandlers for further processing of the message.
            invokeHandlers();

            AxisEngine.receive(msgContext);
        } catch (AxisFault ex) {
            handleException("Error processing " + incomingCarbonMsg.getHttpMethod()
                    + " request for : " + incomingCarbonMsg.getProperty(BridgeConstants.TO), ex);
        }
        sendAck();
        cleanup();
    }

    /**
     * Check if the request is a WSDL query by invoking the registered {@code HttpGetRequestProcessor} for this
     * transport.
     *
     * @return true if the request is a WSDL query, otherwise false
     */
    private boolean isRequestToFetchWSDL() {

        String method = incomingCarbonMsg.getHttpMethod();

        // WSDL queries are normally GET or HEAD requests. Therefore, we need to invoke the http GET request processor
        // for such requests to handle WSDL requests.
        if (PassThroughConstants.HTTP_GET.equals(method)
                || PassThroughConstants.HTTP_HEAD.equals(method)
                || PassThroughConstants.HTTP_OPTIONS.equals(method)) {

            sourceConfiguration.getHttpGetRequestProcessor().process(incomingCarbonMsg, msgContext, true);
        }

        // if this request is to fetch WSDL, then the WSDL_REQUEST_HANDLED property should be set to true
        // in the message context by the HttpGetRequestProcessor.
        return Boolean.TRUE.equals((msgContext.getProperty(BridgeConstants.WSDL_REQUEST_HANDLED)));
    }

    /**
     * Get the URI of underlying HttpCarbonMessage and generate the service prefix and add to the message context.
     */
    private void processHttpRequestUri() {

        String servicePrefixIndex = "://";
        msgContext.setProperty(Constants.Configuration.HTTP_METHOD, incomingCarbonMsg.getHttpMethod().toUpperCase());
        String oriUri = (String) incomingCarbonMsg.getProperty(BridgeConstants.TO);
        String restUrlPostfix = RequestResponseUtils.getRestUrlPostfix(oriUri, configurationContext.getServicePath());

        String servicePrefix = oriUri.substring(0, oriUri.indexOf(restUrlPostfix));
        if (!servicePrefix.contains(servicePrefixIndex)) {
            InetSocketAddress localAddress = (InetSocketAddress) incomingCarbonMsg
                    .getProperty(org.wso2.transport.http.netty.contract.Constants.LOCAL_ADDRESS);
            if (localAddress != null) {
                servicePrefix = incomingCarbonMsg.getProperty(org.wso2.transport.http.netty.contract.Constants.PROTOCOL)
                        + servicePrefixIndex + localAddress.getHostName() + ":"
                        + incomingCarbonMsg.getProperty(org.wso2.transport.http.netty.contract.Constants.LISTENER_PORT)
                        + servicePrefix;
            }
        }
        msgContext.setProperty(BridgeConstants.SERVICE_PREFIX, servicePrefix);
        msgContext.setTo(new EndpointReference(restUrlPostfix));
        msgContext.setProperty(BridgeConstants.REST_URL_POSTFIX, restUrlPostfix);
    }

    /**
     * Populates required properties in the message context.
     *
     * @throws AxisFault if an error occurs while setting the SOAP envelope
     */
    private void populateProperties() throws AxisFault {

        this.requestHasEntityBody = HttpUtils.requestHasEntityBody(incomingCarbonMsg);
        if (!requestHasEntityBody) {
            msgContext.setProperty(PassThroughConstants.NO_ENTITY_BODY, Boolean.TRUE);
        }

        // set ContentType, MessageType, and CHARACTER_SET_ENCODING properties
        setContentTypeMessageTypeAndCharacterEncoding();

        msgContext.setProperty(HTTPConstants.HTTP_METHOD, incomingCarbonMsg.getHttpMethod().toUpperCase());
        msgContext.setTo(new EndpointReference((String) incomingCarbonMsg.getProperty(BridgeConstants.TO)));

        String contentType = msgContext.getProperty(Constants.Configuration.CONTENT_TYPE).toString();
        String soapAction = incomingCarbonMsg.getHeaders().get(SOAP_ACTION_HEADER);
        int soapVersion = RequestResponseUtils.populateSOAPVersion(msgContext, contentType);

        if (RequestResponseUtils.isRESTRequest(msgContext, contentType, soapVersion, soapAction)) {
            msgContext.setProperty(PassThroughConstants.REST_REQUEST_CONTENT_TYPE, contentType);
            msgContext.setDoingREST(true);
        }

        setSOAPAction(soapAction);
        setSOAPEnvelope(soapVersion);
    }

    private void setSOAPAction(String soapAction) {

        if ((soapAction != null) && soapAction.startsWith("\"") && soapAction.endsWith("\"")) {
            soapAction = soapAction.substring(1, soapAction.length() - 1);
            msgContext.setSoapAction(soapAction);
        }
    }

    private void setSOAPEnvelope(int soapVersion) throws AxisFault {

        SOAPEnvelope envelope;
        SOAPFactory fac;
        if (soapVersion == 1) {
            fac = OMAbstractFactory.getSOAP11Factory();
        } else {
            fac = OMAbstractFactory.getSOAP12Factory();
        }
        envelope = fac.getDefaultEnvelope();
        try {
            msgContext.setEnvelope(envelope);
        } catch (AxisFault axisFault) {
            LOG.error("Error occurred while setting the SOAP envelope to the request message context");
            throw axisFault;
        }
    }

    public void setContentTypeMessageTypeAndCharacterEncoding() {

        String contentTypeHeader = incomingCarbonMsg.getHeaders().get(CONTENT_TYPE_HEADER);
        String charSetEncoding;
        String contentType;
        String messageType;

        if (contentTypeHeader != null) {
            contentType = contentTypeHeader;
            if (HTTPConstants.MEDIA_TYPE_X_WWW_FORM.equals(contentTypeHeader)) {
                // if the Content-Type headers is application/x-www-form-urlencoded, then setting the message type as
                // application/xml.
                messageType = HTTPConstants.MEDIA_TYPE_APPLICATION_XML;
            } else {
                messageType = TransportUtils.getContentType(contentTypeHeader, msgContext);
            }
        } else {
            if (requestHasEntityBody) {
                Parameter param = sourceConfiguration.getConfigurationContext().getAxisConfiguration().
                        getParameter(BridgeConstants.DEFAULT_REQUEST_CONTENT_TYPE);
                if (param != null) {
                    contentType = param.getValue().toString();
                    messageType = contentType;
                } else {
                    // According to the RFC 7231 section 3.1.5.5, if the request containing a payload body does not
                    // have a Content-Type header field, then the recipient may assume a media type
                    // of "application/octet-stream"
                    contentType = BridgeConstants.CONTENT_TYPE_APPLICATION_OCTET_STREAM;
                    messageType = HTTPConstants.MEDIA_TYPE_APPLICATION_XML;
                }
            } else {
                String httpMethod = (String) this.msgContext.getProperty(BridgeConstants.HTTP_METHOD);
                if (HTTPConstants.HEADER_GET.equals(httpMethod) || HTTPConstants.HEADER_DELETE.equals(httpMethod)) {
                    contentType = HTTPConstants.MEDIA_TYPE_X_WWW_FORM;
                } else {
                    contentType = BridgeConstants.CONTENT_TYPE_APPLICATION_OCTET_STREAM;
                }
                messageType = HTTPConstants.MEDIA_TYPE_APPLICATION_XML;
            }
        }
        msgContext.setProperty(Constants.Configuration.CONTENT_TYPE, contentType);
        msgContext.setProperty(Constants.Configuration.MESSAGE_TYPE, messageType);
        charSetEncoding = BuilderUtil.getCharSetEncoding(contentType);
        msgContext.setProperty(Constants.Configuration.CHARACTER_SET_ENCODING, charSetEncoding);
    }

    private boolean isResponseWrittenOrSkipped() {

        String respWritten = (String) msgContext.getOperationContext().getProperty(
                Constants.RESPONSE_WRITTEN);
        return BridgeConstants.VALUE_TRUE.equals(respWritten) || "SKIP".equals(respWritten);
    }

    private boolean isSoapFault() {

        return msgContext.getProperty(BridgeConstants.FORCE_SOAP_FAULT) != null;
    }

    private boolean forceSCAccepted() {

        return msgContext.isPropertyTrue(BridgeConstants.FORCE_SC_ACCEPTED);
    }

    private boolean requestResponseTransportStatusEqualsToAcked() {

        RequestResponseTransport.RequestResponseTransportStatus transportStatus =
                ((RequestResponseTransport) msgContext.getProperty(RequestResponseTransport.TRANSPORT_CONTROL))
                        .getStatus();
        return RequestResponseTransport.RequestResponseTransportStatus.ACKED.equals(transportStatus);
    }

    private boolean nioAckRequested() {
        // TODO: check this
        return msgContext.isPropertyTrue(BridgeConstants.NIO_ACK_REQUESTED);
    }

    public boolean ackShouldSend() {

        return forceSCAccepted()
                || requestResponseTransportStatusEqualsToAcked()
                || nioAckRequested() ||
                !(isResponseWrittenOrSkipped() || isSoapFault());
    }

    /**
     * Sends a HTTP response to the client immediately after the current execution thread finishes, if the
     * 1. FORCE_SC_ACCEPTED property is true or
     * 2. A response is not written and no FORCE_SOAP_FAULT property is set or
     * 3. NIO-ACK-Requested property is set to true or
     * 4. RequestResponseTransportStatus is set to ACKED.
     */
    private void sendAck() {

        if (ackShouldSend()) {
            int statusCode;
            HttpResponseStatus responseStatus;
            if (!nioAckRequested()) {
                statusCode = HttpStatus.SC_ACCEPTED;
                responseStatus = HttpResponseStatus.ACCEPTED;
            } else {
                statusCode = Integer.parseInt(msgContext.getProperty(NhttpConstants.HTTP_SC).toString());
                responseStatus = HttpResponseStatus.valueOf(statusCode);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending ACK response with status " + statusCode + ", for MessageID : "
                        + msgContext.getMessageID());
            }
            sendResponse(statusCode, responseStatus, false, false, null, null);
        }
    }

    private void sendResponse(int statusCode, HttpResponseStatus responseStatus, boolean disableKeepAlive,
                              boolean contentAvailable, String content, String contentType) {

        HttpCarbonMessage clientRequest =
                (HttpCarbonRequest) this.msgContext.getProperty(BridgeConstants.HTTP_CLIENT_REQUEST_CARBON_MESSAGE);

        HttpCarbonMessage outboundResponse;
        try {
            outboundResponse = new HttpCarbonMessage(new DefaultHttpResponse(HttpVersion.HTTP_1_1, responseStatus));
            outboundResponse.setHttpStatusCode(statusCode);
            if (disableKeepAlive) {
                outboundResponse.setKeepAlive(false);
            }
            clientRequest.respond(outboundResponse);

        } catch (ServerConnectorException e) {
            LOG.error("Error occurred while submitting the Ack to the client", e);
            return;
        }

        if (!contentAvailable) {
            try {
                OutputStream messageOutputStream = HttpUtils.getHttpMessageDataStreamer(outboundResponse)
                        .getOutputStream();
                HttpUtils.writeEmptyBody(messageOutputStream);
            } catch (AxisFault e) {
                LOG.error("Error occurred while writing the Ack to the client", e);
            }
            return;
        }

        outboundResponse.setHeader(HTTP.CONTENT_TYPE, contentType);
        try (OutputStream outputStream =
                     HttpUtils.getHttpMessageDataStreamer(outboundResponse).getOutputStream()) {
            outputStream.write(content.getBytes());
        } catch (IOException ioException) {
            LOG.error("Error occurred while writing the response body to the client", ioException);
        }
    }

    private void handleException(String msg, Exception e) {

        if (Objects.isNull(e)) {
            LOG.error(msg);
            e = new Exception(msg);
        } else {
            LOG.error(msg, e);
        }

        try {
            MessageContext faultContext = MessageContextBuilder.createFaultMessageContext(msgContext, e);
            msgContext.setProperty(PassThroughConstants.FORCE_SOAP_FAULT, Boolean.TRUE);
            AxisEngine.sendFault(faultContext);

        } catch (Exception ex) {
            String body = "<html><body><h1>Failed to process the request</h1>"
                    + "<p>" + msg + "</p></body></html>";

            sendResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpResponseStatus.INTERNAL_SERVER_ERROR,
                    false, true, body, "text/html");
        }
    }

    /**
     * {@code MessagingHandler} is an extension point to intercept the inbound HTTP request for further processing.
     * This invokes the {@code handleSourceRequest} method of all the registered MessagingHandler instances to handle
     * the inbound request before going to the mediation flow.
     *
     * @return whether flow should continue further
     */
    private boolean invokeHandlers() {

        List<MessagingHandler> messagingHandlers = sourceConfiguration.getMessagingHandlers();
        if (Objects.isNull(messagingHandlers) || messagingHandlers.isEmpty()) {
            return true;
        }

        Protocol protocol;
        if (sourceConfiguration.getScheme().isSSL()) {
            protocol = Protocol.HTTPS;
        } else {
            protocol = Protocol.HTTP;
        }

        MessageInfo message = new MessageInfo(incomingCarbonMsg, protocol,
                new ConnectionId(incomingCarbonMsg.getSourceContext().channel().id().asShortText()));
        msgContext.setProperty(MessagingHandlerConstants.HANDLER_MESSAGE_CONTEXT, message);

        for (MessagingHandler handler: messagingHandlers) {
            HandlerResponse response = handler.handleRequest(msgContext);
            if (Objects.nonNull(response) && response.isError()) {
                LOG.error("Source request validation failed. " + response.getErrorResponseString());
                sendResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpResponseStatus.INTERNAL_SERVER_ERROR, true,
                        false, null, null);
                return false;
            }
        }
        return true;
    }

    /**
     * Perform cleanup of HttpRequestWorker.
     */
    private void cleanup() {
        //clean threadLocal variables
        MessageContext.destroyCurrentMessageContext();
    }
}