MessageUtils.java
/*
* Copyright (c) 2022. WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.synapse.transport.netty.util;
import com.google.gson.JsonParser;
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.OMOutputFormat;
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.addressing.AddressingHelper;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.description.Parameter;
import org.apache.axis2.description.WSDL2Constants;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.axis2.engine.Handler;
import org.apache.axis2.engine.Phase;
import org.apache.axis2.transport.MessageFormatter;
import org.apache.axis2.transport.RequestResponseTransport;
import org.apache.axis2.transport.TransportUtils;
import org.apache.axis2.transport.http.ApplicationXMLFormatter;
import org.apache.axis2.transport.http.HTTPConstants;
import org.apache.axis2.transport.http.SOAPMessageFormatter;
import org.apache.axis2.transport.http.XFormURLEncodedFormatter;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.transport.netty.BridgeConstants;
import org.apache.synapse.transport.netty.config.NettyConfiguration;
import org.wso2.transport.http.netty.message.HttpCarbonMessage;
import org.wso2.transport.http.netty.message.HttpMessageDataStreamer;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import javax.xml.stream.XMLStreamException;
/**
* Class MessageUtils contains helper methods that are used to build the payload.
*/
public class MessageUtils {
private static final Log LOG = LogFactory.getLog(MessageUtils.class);
private static final DeferredMessageBuilder messageBuilder = new DeferredMessageBuilder();
private static boolean noAddressingHandler = false;
private static volatile Handler addressingInHandler = null;
private static final Boolean forceMessageBuild;
private static final boolean forceXmlValidation;
private static final boolean forceJSONValidation;
static {
forceMessageBuild = NettyConfiguration.getInstance().isForcedMessageBuildEnabled();
forceXmlValidation = NettyConfiguration.getInstance().isForcedXmlMessageValidationEnabled();
forceJSONValidation = NettyConfiguration.getInstance().isForcedJSONMessageValidationEnabled();
}
public static void buildMessage(MessageContext msgCtx) throws IOException {
buildMessage(msgCtx, false);
}
public static void buildMessage(MessageContext msgCtx, boolean earlyBuild) throws IOException {
if (Boolean.TRUE.equals(msgCtx.getProperty(BridgeConstants.MESSAGE_BUILDER_INVOKED))) {
return;
}
if (msgCtx.getProperty(Constants.Configuration.CONTENT_TYPE) == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Content Type is null and the message is not build");
}
msgCtx.setProperty(BridgeConstants.MESSAGE_BUILDER_INVOKED,
Boolean.TRUE);
return;
}
if (!RequestResponseUtils.isHttpCarbonMessagePresent(msgCtx) || !forceMessageBuild) {
return;
}
HttpCarbonMessage httpCarbonMessage =
(HttpCarbonMessage) msgCtx.getProperty(BridgeConstants.HTTP_CARBON_MESSAGE);
HttpMessageDataStreamer httpMessageDataStreamer = new HttpMessageDataStreamer(httpCarbonMessage);
if (!HttpUtils.requestHasEntityBody(httpCarbonMessage)) {
return;
}
InputStream in = httpMessageDataStreamer.getInputStream();
ByteArrayOutputStream byteArrayOutputStream = null;
if (forceXmlValidation || forceJSONValidation) {
//read input stream to store raw data and create inputStream again.
//then the raw data can be logged after an error while building the message.
byteArrayOutputStream = new ByteArrayOutputStream();
IOUtils.copy(in, byteArrayOutputStream);
byteArrayOutputStream.flush();
// Open new InputStreams using the recorded bytes and assign to in
in = new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
}
BufferedInputStream bufferedInputStream = (BufferedInputStream) msgCtx
.getProperty(BridgeConstants.BUFFERED_INPUT_STREAM);
if (bufferedInputStream != null) {
try {
bufferedInputStream.reset();
bufferedInputStream.mark(0);
} catch (Exception e) {
// just ignore the error
}
} else {
bufferedInputStream = new BufferedInputStream(in);
// Need to handle properly for the moment lets use around 100k
// buffer.
bufferedInputStream.mark(128 * 1024);
msgCtx.setProperty(BridgeConstants.BUFFERED_INPUT_STREAM,
bufferedInputStream);
}
OMElement element;
try {
element = messageBuilder.getDocument(msgCtx, bufferedInputStream);
if (element != null) {
msgCtx.setEnvelope(TransportUtils.createSOAPEnvelope(element));
msgCtx.setProperty(DeferredMessageBuilder.RELAY_FORMATTERS_MAP,
messageBuilder.getFormatters());
msgCtx.setProperty(BridgeConstants.MESSAGE_BUILDER_INVOKED, Boolean.TRUE);
earlyBuild = msgCtx.getProperty(BridgeConstants.RELAY_EARLY_BUILD) != null ? (Boolean) msgCtx
.getProperty(BridgeConstants.RELAY_EARLY_BUILD) : earlyBuild;
if (!earlyBuild) {
processAddressing(msgCtx);
}
//force validation makes sure that the xml is well formed (not having multi root element), and the json
// message is valid (not having any content after the final enclosing bracket)
if (forceXmlValidation || forceJSONValidation) {
String rawData = null;
try {
String contentType = (String) msgCtx.getProperty(Constants.Configuration.CONTENT_TYPE);
if (BridgeConstants.JSON_CONTENT_TYPE.equals(getMIMEContentType(contentType))
&& forceJSONValidation) {
rawData = byteArrayOutputStream.toString();
JsonParser jsonParser = new JsonParser();
jsonParser.parse(rawData);
} else {
msgCtx.getEnvelope().buildWithAttachments();
if (msgCtx.getEnvelope().getBody().getFirstElement() != null) {
msgCtx.getEnvelope().getBody().getFirstElement().buildNext();
}
}
} catch (Exception e) {
if (rawData == null) {
rawData = byteArrayOutputStream.toString();
}
LOG.error("Error while building the message.\n" + rawData);
msgCtx.setProperty(BridgeConstants.RAW_PAYLOAD, rawData);
throw e;
}
}
}
} catch (IOException | XMLStreamException e) {
msgCtx.setProperty(BridgeConstants.MESSAGE_BUILDER_INVOKED, Boolean.TRUE);
}
}
/**
* Get MIME content type out of content-type header.
* @param contentType content type header value
* @return MIME content type
*/
public static String getMIMEContentType(String contentType) {
String type;
int index = contentType.indexOf(';');
if (index > 0) {
type = contentType.substring(0, index);
} else {
int commaIndex = contentType.indexOf(',');
if (commaIndex > 0) {
type = contentType.substring(0, commaIndex);
} else {
type = contentType;
}
}
return type;
}
/**
* Function to check given inputstream is empty or not
* Used to check whether content of the payload input stream is empty or not.
*
* @param inputStream target inputstream
* @return true if it is a empty stream
* @throws IOException
*/
public static boolean isEmptyPayloadStream(InputStream inputStream) throws IOException {
boolean isEmptyPayload = true;
if (inputStream != null) {
// read ahead few characters to see if the stream is valid.
// Checks for all empty or all whitespace streams and if found sets isEmptyPayload to false. The while
// loop exits if found any character other than space or end of stream reached.
int c = inputStream.read();
while (c != -1) {
if (c != 32) {
//if not a space, should be some character in entity body
isEmptyPayload = false;
break;
}
c = inputStream.read();
}
inputStream.reset();
}
return isEmptyPayload;
}
/**
* This selects the formatter for a given message format based on the the content type of the received message.
* content-type to builder mapping can be specified through the Axis2.xml.
*
* @param msgContext axis2 MessageContext
* @return the formatter registered against the given content-type
*/
public static MessageFormatter getMessageFormatter(MessageContext msgContext) {
MessageFormatter messageFormatter = null;
String messageFormatString = getMessageFormatterProperty(msgContext);
messageFormatString = getContentTypeForFormatterSelection(messageFormatString, msgContext);
if (messageFormatString != null) {
messageFormatter = msgContext.getConfigurationContext()
.getAxisConfiguration().getMessageFormatter(messageFormatString);
if (LOG.isDebugEnabled()) {
LOG.debug("Message format is: " + messageFormatString
+ "; message formatter returned by AxisConfiguration: " + messageFormatter);
}
}
if (messageFormatter == null) {
messageFormatter = (MessageFormatter) msgContext.getProperty(Constants.Configuration.MESSAGE_FORMATTER);
if (messageFormatter != null) {
return messageFormatter;
}
}
if (messageFormatter == null) {
// If we are doing rest better default to Application/xml formatter
if (msgContext.isDoingREST()) {
String httpMethod = (String) msgContext.getProperty(Constants.Configuration.HTTP_METHOD);
if (Constants.Configuration.HTTP_METHOD_GET.equals(httpMethod) ||
Constants.Configuration.HTTP_METHOD_DELETE.equals(httpMethod)) {
return new XFormURLEncodedFormatter();
}
return new ApplicationXMLFormatter();
} else {
// Lets default to SOAP formatter
messageFormatter = new SOAPMessageFormatter();
}
}
return messageFormatter;
}
private static String getMessageFormatterProperty(MessageContext msgContext) {
String messageFormatterProperty = null;
Object property = msgContext
.getProperty(Constants.Configuration.MESSAGE_TYPE);
if (property != null) {
messageFormatterProperty = (String) property;
}
if (messageFormatterProperty == null) {
Parameter parameter = msgContext
.getParameter(Constants.Configuration.MESSAGE_TYPE);
if (parameter != null) {
messageFormatterProperty = (String) parameter.getValue();
}
}
return messageFormatterProperty;
}
private static String getContentTypeForFormatterSelection(String type, MessageContext msgContext) {
/*
* Handle special case where content-type : text/xml and SOAPAction = null consider as
* POX (REST) message not SOAP 1.1.
*
* 1.) it's required use the Builder associate with "application/xml" here but should not
* change content type of current message.
*/
String cType = type;
if (msgContext.isDoingREST() && HTTPConstants.MEDIA_TYPE_TEXT_XML.equals(type)) {
cType = HTTPConstants.MEDIA_TYPE_APPLICATION_XML;
msgContext.setProperty(Constants.Configuration.CONTENT_TYPE, HTTPConstants.MEDIA_TYPE_TEXT_XML);
}
return cType;
}
public static OMOutputFormat getOMOutputFormat(MessageContext msgContext) {
OMOutputFormat format;
if (msgContext.getProperty(BridgeConstants.MESSAGE_OUTPUT_FORMAT) != null) {
format = (OMOutputFormat) msgContext.getProperty(BridgeConstants.MESSAGE_OUTPUT_FORMAT);
} else {
format = new OMOutputFormat();
}
msgContext.setDoingMTOM(TransportUtils.doWriteMTOM(msgContext));
msgContext.setDoingSwA(TransportUtils.doWriteSwA(msgContext));
msgContext.setDoingREST(TransportUtils.isDoingREST(msgContext));
/*
* BridgeConstants.INVOKED_REST set to true here if isDoingREST is true -
* this enables us to check whether the original request to the endpoint was a
* REST request inside DeferredMessageBuilder (which we need to convert
* text/xml content type into application/xml if the request was not a SOAP
* request.
*/
if (msgContext.isDoingREST()) {
msgContext.setProperty(BridgeConstants.INVOKED_REST, true);
}
format.setSOAP11(msgContext.isSOAP11());
format.setDoOptimize(msgContext.isDoingMTOM());
format.setDoingSWA(msgContext.isDoingSwA());
format.setCharSetEncoding(TransportUtils.getCharSetEncoding(msgContext));
Object mimeBoundaryProperty = msgContext.getProperty(Constants.Configuration.MIME_BOUNDARY);
if (mimeBoundaryProperty != null) {
format.setMimeBoundary((String) mimeBoundaryProperty);
}
return format;
}
private static void processAddressing(MessageContext messageContext) throws AxisFault {
if (noAddressingHandler) {
return;
} else if (addressingInHandler == null) {
synchronized (messageBuilder) {
if (addressingInHandler == null) {
AxisConfiguration axisConfig = messageContext.getConfigurationContext()
.getAxisConfiguration();
List<Phase> phases = axisConfig.getInFlowPhases();
boolean handlerFound = false;
for (Phase phase : phases) {
if ("Addressing".equals(phase.getName())) {
List<Handler> handlers = phase.getHandlers();
for (Handler handler : handlers) {
if ("AddressingInHandler".equals(handler.getName())) {
addressingInHandler = handler;
handlerFound = true;
break;
}
}
break;
}
}
if (!handlerFound) {
noAddressingHandler = true;
return;
}
}
}
}
if (messageContext.getProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_IN_MESSAGES) == null) {
messageContext.setProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_IN_MESSAGES, "false");
}
Object disableAddressingForOutGoing = null;
if (messageContext.getProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_OUT_MESSAGES) != null) {
disableAddressingForOutGoing = messageContext
.getProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_OUT_MESSAGES);
}
addressingInHandler.invoke(messageContext);
if (disableAddressingForOutGoing != null) {
messageContext.setProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_OUT_MESSAGES,
disableAddressingForOutGoing);
}
if (messageContext.getAxisOperation() == null) {
return;
}
String mepString = messageContext.getAxisOperation().getMessageExchangePattern();
if (isOneWay(mepString)) {
Object requestResponseTransport = messageContext
.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
if (requestResponseTransport != null) {
Boolean disableAck = getDisableAck(messageContext);
if (disableAck == null || !disableAck) {
((RequestResponseTransport) requestResponseTransport)
.acknowledgeMessage(messageContext);
}
}
} else if (AddressingHelper.isReplyRedirected(messageContext)
&& AddressingHelper.isFaultRedirected(messageContext)) {
if (mepString.equals(WSDL2Constants.MEP_URI_IN_OUT)) {
// OR, if 2 way operation but the response is intended to not
// use the response channel of a 2-way transport
// then we don't need to keep the transport waiting.
Object requestResponseTransport = messageContext
.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
if (requestResponseTransport != null) {
// We should send an early ack to the transport whenever
// possible, but some modules need
// to use the back channel, so we need to check if they have
// disabled this code.
Boolean disableAck = getDisableAck(messageContext);
if (disableAck == null || !disableAck) {
((RequestResponseTransport) requestResponseTransport)
.acknowledgeMessage(messageContext);
}
}
}
}
}
private static Boolean getDisableAck(MessageContext msgContext) {
// We should send an early ack to the transport whenever possible, but
// some modules need
// to use the back channel, so we need to check if they have disabled
// this code.
Boolean disableAck = (Boolean) msgContext
.getProperty(Constants.Configuration.DISABLE_RESPONSE_ACK);
if (disableAck == null) {
disableAck = (Boolean) (msgContext.getAxisService() != null ? msgContext
.getAxisService().getParameterValue(
Constants.Configuration.DISABLE_RESPONSE_ACK) : null);
}
return disableAck;
}
private static boolean isOneWay(String mepString) {
return mepString.equals(WSDL2Constants.MEP_URI_IN_ONLY);
}
}