package org.apache.synapse.transport.amqp;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.axiom.om.OMOutputFormat;
import org.apache.axiom.om.util.UUIDGenerator;
import org.apache.axis2.AxisFault;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.description.TransportOutDescription;
import org.apache.axis2.transport.MessageFormatter;
import org.apache.axis2.transport.OutTransportInfo;
import org.apache.axis2.transport.TransportUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.exchange.ExchangeDefaults;
import org.apache.qpidity.nclient.Client;
import org.apache.qpidity.nclient.Connection;
import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
import org.apache.qpidity.transport.DeliveryProperties;
import org.apache.qpidity.transport.MessageProperties;
import org.apache.qpidity.transport.Option;
import org.apache.qpidity.transport.ReplyTo;
import org.apache.synapse.transport.base.AbstractTransportSender;
import org.apache.synapse.transport.base.BaseUtils;
import org.apache.synapse.transport.jms.JMSUtils;

/* loaded from: input_file:WEB-INF/lib/synapse-transports-1.2.wso2v2.jar:org/apache/synapse/transport/amqp/AMQPSender.class */
public class AMQPSender extends AbstractTransportSender {
    public static final String TRANSPORT_NAME = "amqp";
    private static final Log log = LogFactory.getLog(AMQPSender.class);
    private Map<String, ConnectionDetails> _connectionDetails = new HashMap();

    /* loaded from: input_file:WEB-INF/lib/synapse-transports-1.2.wso2v2.jar:org/apache/synapse/transport/amqp/AMQPSender$ConnectionDetails.class */
    private class ConnectionDetails {
        private Connection _conn;
        private Session _session;

        public ConnectionDetails(Connection connection) {
            this._conn = connection;
        }

        public Session getSession() {
            if (this._session != null) {
                this._session = this._conn.createSession(0L);
            }
            return this._session;
        }
    }

    @Override // org.apache.synapse.transport.base.AbstractTransportSender, org.apache.axis2.transport.TransportSender
    public void init(ConfigurationContext configurationContext, TransportOutDescription transportOutDescription) throws AxisFault {
        setTransportName("amqp");
        super.init(configurationContext, transportOutDescription);
    }

    @Override // org.apache.synapse.transport.base.AbstractTransportSender
    public void sendMessage(MessageContext messageContext, String str, OutTransportInfo outTransportInfo) throws AxisFault {
        AMQPOutTransportInfo aMQPOutTransportInfo = null;
        ConnectionDetails connectionDetails = null;
        Session session = null;
        if (str != null) {
            aMQPOutTransportInfo = new AMQPOutTransportInfo(str);
        } else if (outTransportInfo != null && (outTransportInfo instanceof AMQPOutTransportInfo)) {
            aMQPOutTransportInfo = (AMQPOutTransportInfo) outTransportInfo;
        }
        if (this._connectionDetails.containsKey(aMQPOutTransportInfo.getConnectionURL())) {
            connectionDetails = this._connectionDetails.get(aMQPOutTransportInfo.getConnectionURL());
        } else {
            Connection createConnection = Client.createConnection();
            try {
                createConnection.connect(aMQPOutTransportInfo.getConnectionURL());
                this._connectionDetails.put(aMQPOutTransportInfo.getConnectionURL(), new ConnectionDetails(createConnection));
            } catch (Exception e) {
                throw new AMQPSynapseException("Error creating a connection to the broker", e);
            }
        }
        if (connectionDetails != null) {
            session = connectionDetails.getSession();
        }
        byte[] bArr = null;
        try {
            bArr = createMessageData(messageContext);
        } catch (AMQPSynapseException e2) {
            handleException("Error creating a message from the axis message context", e2);
        }
        boolean waitForSynchronousResponse = waitForSynchronousResponse(messageContext);
        DeliveryProperties deliveryProperties = new DeliveryProperties();
        MessageProperties messageProperties = new MessageProperties();
        fillMessageHeaders(messageContext, aMQPOutTransportInfo, session, waitForSynchronousResponse, deliveryProperties, messageProperties);
        synchronized (session) {
            session.header(messageProperties, deliveryProperties);
            session.data(bArr);
            session.endData();
        }
        if (waitForSynchronousResponse) {
            waitForResponseAndProcess(session, messageProperties, messageContext);
        }
    }

    private void fillMessageHeaders(MessageContext messageContext, AMQPOutTransportInfo aMQPOutTransportInfo, Session session, boolean z, DeliveryProperties deliveryProperties, MessageProperties messageProperties) {
        deliveryProperties.setExchange(aMQPOutTransportInfo.getExchangeName());
        deliveryProperties.setRoutingKey(aMQPOutTransportInfo.getRoutingKey());
        try {
            messageProperties.setContentType(TransportUtils.getMessageFormatter(messageContext).getContentType(messageContext, BaseUtils.getOMOutputFormat(messageContext), messageContext.getSoapAction()));
            HashMap hashMap = new HashMap();
            if (messageContext.isServerSide()) {
                hashMap.put("SOAPAction", (String) messageContext.getProperty("SOAPAction"));
            } else {
                String action = messageContext.getOptions().getAction();
                if (action != null) {
                    hashMap.put("SOAPAction", action);
                }
            }
            messageProperties.setApplicationHeaders(hashMap);
            Map map = (Map) messageContext.getProperty(MessageContext.TRANSPORT_HEADERS);
            if (map != null) {
                for (String str : map.keySet()) {
                    if (AMQPConstants.AMQP_CORELATION_ID.equals(str)) {
                        messageProperties.setCorrelationId((String) map.get(AMQPConstants.AMQP_CORELATION_ID));
                    } else if (AMQPConstants.AMQP_DELIVERY_MODE.equals(str)) {
                        Object obj = map.get(AMQPConstants.AMQP_DELIVERY_MODE);
                        if (obj instanceof Short) {
                            deliveryProperties.setDeliveryMode(((Short) obj).shortValue());
                        } else if (obj instanceof Integer) {
                            deliveryProperties.setDeliveryMode(((Integer) obj).shortValue());
                        } else if (obj instanceof String) {
                            try {
                                deliveryProperties.setDeliveryMode(Short.parseShort((String) obj));
                            } catch (NumberFormatException e) {
                                log.warn("Invalid delivery mode ignored : " + obj, e);
                            }
                        } else {
                            log.warn("Invalid delivery mode ignored : " + obj);
                        }
                    } else if (AMQPConstants.AMQP_EXPIRATION.equals(str)) {
                        deliveryProperties.setExpiration(Long.parseLong((String) map.get(AMQPConstants.AMQP_EXPIRATION)));
                    } else if (AMQPConstants.AMQP_MESSAGE_ID.equals(str)) {
                        messageProperties.setMessageId((String) map.get(AMQPConstants.AMQP_MESSAGE_ID));
                    } else if (AMQPConstants.AMQP_PRIORITY.equals(str)) {
                        deliveryProperties.setPriority(Short.parseShort((String) map.get(AMQPConstants.AMQP_PRIORITY)));
                    } else if (AMQPConstants.AMQP_TIMESTAMP.equals(str)) {
                        deliveryProperties.setTimestamp(Long.parseLong((String) map.get(AMQPConstants.AMQP_TIMESTAMP)));
                    } else {
                        hashMap.put(str, map.get(str));
                    }
                }
            }
            if (messageContext.getProperty(AMQPConstants.AMQP_REPLY_TO_EXCHANGE_NAME) != null) {
                messageProperties.setReplyTo(new ReplyTo((String) messageContext.getProperty(AMQPConstants.AMQP_REPLY_TO_EXCHANGE_NAME), messageContext.getProperty(AMQPConstants.AMQP_REPLY_TO_ROUTING_KEY) != null ? (String) messageContext.getProperty(AMQPConstants.AMQP_REPLY_TO_ROUTING_KEY) : null));
            }
            if (z) {
                if (z && messageProperties.getCorrelationId() == null) {
                    if (messageContext.getProperty(AMQPConstants.AMQP_CORELATION_ID) != null) {
                        messageProperties.setCorrelationId((String) messageContext.getProperty(AMQPConstants.AMQP_CORELATION_ID));
                    } else {
                        messageProperties.setCorrelationId(UUIDGenerator.getUUID());
                    }
                }
                if (messageProperties.getReplyTo() == null) {
                    String str2 = "Queue_" + messageProperties.getCorrelationId();
                    synchronized (session) {
                        session.queueDeclare(str2, null, null, Option.AUTO_DELETE, Option.EXCLUSIVE);
                        session.queueBind(str2, ExchangeDefaults.DIRECT_EXCHANGE_NAME, str2, null);
                        session.sync();
                    }
                    messageProperties.replyTo(new ReplyTo(ExchangeDefaults.DIRECT_EXCHANGE_NAME, str2));
                }
            }
        } catch (AxisFault e2) {
            throw new AMQPSynapseException("Unable to get the message formatter to use");
        }
    }

    private byte[] createMessageData(MessageContext messageContext) {
        OMOutputFormat oMOutputFormat = BaseUtils.getOMOutputFormat(messageContext);
        try {
            MessageFormatter messageFormatter = TransportUtils.getMessageFormatter(messageContext);
            messageFormatter.getContentType(messageContext, oMOutputFormat, messageContext.getSoapAction());
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            try {
                messageFormatter.writeTo(messageContext, oMOutputFormat, byteArrayOutputStream, true);
                byteArrayOutputStream.flush();
                return byteArrayOutputStream.toByteArray();
            } catch (IOException e) {
                throw new AMQPSynapseException("IO Error while creating message", e);
            }
        } catch (AxisFault e2) {
            throw new AMQPSynapseException("Unable to get the message formatter to use", e2);
        }
    }

    private void waitForResponseAndProcess(Session session, MessageProperties messageProperties, MessageContext messageContext) throws AxisFault {
        long j = 30000;
        String str = (String) messageContext.getProperty(AMQPConstants.AMQP_WAIT_REPLY);
        if (str != null) {
            j = Long.valueOf(str).longValue();
        }
        String routingKey = messageProperties.getReplyTo().getRoutingKey();
        MessageManager messageManager = new MessageManager(session, routingKey, messageProperties.getCorrelationId());
        session.messageSubscribe(messageProperties.getReplyTo().getRoutingKey(), routingKey, (short) 1, (short) 0, new MessagePartListenerAdapter(messageManager), null, Option.NO_OPTION);
        Message receive = messageManager.receive(j);
        if (receive != null) {
            processSyncResponse(messageContext, receive);
        } else {
            log.warn("Did not receive a response within " + j + " ms to destination : " + messageProperties.getReplyTo().getRoutingKey() + " with correlation ID : " + messageProperties.getCorrelationId());
        }
    }

    private void processSyncResponse(MessageContext messageContext, Message message) throws AxisFault {
        MessageContext createResponseMessageContext = createResponseMessageContext(messageContext);
        Map transportHeaders = AMQPUtils.getTransportHeaders(message);
        createResponseMessageContext.setProperty(MessageContext.TRANSPORT_HEADERS, transportHeaders);
        createResponseMessageContext.setServerSide(false);
        String property = JMSUtils.getInstace().getProperty(message, "Content-Type");
        AMQPUtils.getInstace().setSOAPEnvelope(message, createResponseMessageContext, property);
        createResponseMessageContext.setServerSide(true);
        handleIncomingMessage(createResponseMessageContext, transportHeaders, (String) transportHeaders.get("SOAPAction"), property);
    }
}
