PassThroughMessageHandler.java
/*
* Copyright (c) 2022, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.synapse.transport.util;
import org.apache.axis2.context.MessageContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.transport.passthru.PassThroughConstants;
import org.apache.synapse.transport.passthru.Pipe;
import org.apache.synapse.transport.passthru.config.PassThroughConfiguration;
import org.apache.synapse.transport.passthru.util.RelayUtils;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import javax.xml.stream.XMLStreamException;
/**
* Message handler to handle the messages coming through Pass-Through transport.
*/
public class PassThroughMessageHandler implements MessageHandler {
private static final Log LOG = LogFactory.getLog(PassThroughMessageHandler.class);
@Override
public InputStream getMessageDataStream(MessageContext context) throws IOException {
Pipe pipe = (Pipe) context.getProperty(PassThroughConstants.PASS_THROUGH_PIPE);
if (pipe != null && context.getProperty(PassThroughConstants.BUFFERED_INPUT_STREAM) != null) {
BufferedInputStream bufferedInputStream =
(BufferedInputStream) context.getProperty(PassThroughConstants.BUFFERED_INPUT_STREAM);
try {
bufferedInputStream.reset();
bufferedInputStream.mark(0);
} catch (Exception e) {
//just ignore the error
}
return bufferedInputStream;
}
if (pipe != null) {
BufferedInputStream bufferedInputStream = new BufferedInputStream(pipe.getInputStream());
// Multiplied it by two because we always need a bigger read-limit than the buffer size.
bufferedInputStream.mark(PassThroughConfiguration.getInstance().getIOBufferSize() * 2);
OutputStream resetOutStream = pipe.resetOutputStream();
ReadableByteChannel inputChannel = Channels.newChannel(bufferedInputStream);
WritableByteChannel outputChannel = Channels.newChannel(resetOutStream);
if (!isMessageBiggerThanBuffer(inputChannel, outputChannel)) {
//TODO:need to find a proper solution
try {
bufferedInputStream.reset();
context.setProperty(PassThroughConstants.BUFFERED_INPUT_STREAM, bufferedInputStream);
MessageHandlerProvider.getMessageHandler(context).buildMessage(context);
} catch (Exception e) {
LOG.error("Error while building message", e);
}
return null;
}
try {
bufferedInputStream.reset();
} catch (Exception e) {
// just ignore the error
}
pipe.setRawSerializationComplete(true);
return bufferedInputStream;
}
return null;
}
@Override
public void buildMessage(MessageContext messageContext) throws XMLStreamException, IOException {
RelayUtils.buildMessage(messageContext);
}
@Override
public void buildMessage(MessageContext messageContext, boolean earlyBuild) throws XMLStreamException, IOException {
RelayUtils.buildMessage(messageContext, earlyBuild);
}
public boolean isMessageBiggerThanBuffer(final ReadableByteChannel src, final WritableByteChannel dest) throws IOException {
int bufferSizeSupport = PassThroughConfiguration.getInstance().getIOBufferSize();
// Added one to make sure temp buffer is always bigger than the io_buffer
final ByteBuffer buffer = ByteBuffer.allocate(bufferSizeSupport + 1);
while (src.read(buffer) != -1) {
if (bufferSizeSupport < buffer.position()) {
return false;
}
}
buffer.flip();
dest.write(buffer);
return true;
}
}