package org.apache.activemq.transport.stomp;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.transport.stomp.Stomp;

/* loaded from: input_file:org/apache/activemq/transport/stomp/StompTest.class */
public class StompTest extends CombinationTestSupport {
    private BrokerService broker;
    private TransportConnector connector;
    private Socket stompSocket;
    private ByteArrayOutputStream inputBuffer;
    private Connection connection;
    private Session session;
    private ActiveMQQueue queue;
    protected String bindAddress = "stomp://localhost:0";

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.AutoFailTestSupport
    public void setUp() throws Exception {
        this.broker = new BrokerService();
        this.broker.setPersistent(false);
        this.connector = this.broker.addConnector(this.bindAddress);
        this.broker.start();
        this.stompSocket = createSocket(this.connector.getConnectUri());
        this.inputBuffer = new ByteArrayOutputStream();
        this.connection = new ActiveMQConnectionFactory("vm://localhost").createConnection();
        this.session = this.connection.createSession(false, 1);
        this.queue = new ActiveMQQueue(getQueueName());
        this.connection.start();
    }

    protected Socket createSocket(URI uri) throws IOException {
        return new Socket("127.0.0.1", uri.getPort());
    }

    protected String getQueueName() {
        return new StringBuffer().append(getClass().getName()).append(ActiveMQDestination.PATH_SEPERATOR).append(getName()).toString();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.AutoFailTestSupport
    public void tearDown() throws Exception {
        this.connection.close();
        this.stompSocket.close();
        this.broker.stop();
    }

    public void sendFrame(String str) throws Exception {
        byte[] bytes = str.getBytes("UTF-8");
        OutputStream outputStream = this.stompSocket.getOutputStream();
        for (byte b : bytes) {
            outputStream.write(b);
        }
        outputStream.flush();
    }

    public String receiveFrame(long j) throws Exception {
        this.stompSocket.setSoTimeout((int) j);
        InputStream inputStream = this.stompSocket.getInputStream();
        while (true) {
            int read = inputStream.read();
            if (read < 0) {
                throw new IOException("socket closed.");
            }
            if (read == 0) {
                assertEquals("Expecting stomp frame to terminate with ��\n", inputStream.read(), 10);
                byte[] byteArray = this.inputBuffer.toByteArray();
                this.inputBuffer.reset();
                return new String(byteArray, "UTF-8");
            }
            this.inputBuffer.write(read);
        }
    }

    public void sendMessage(String str) throws Exception {
        sendMessage(str, "foo", "xyz");
    }

    public void sendMessage(String str, String str2, String str3) throws JMSException {
        MessageProducer createProducer = this.session.createProducer(this.queue);
        TextMessage createTextMessage = this.session.createTextMessage(str);
        createTextMessage.setStringProperty(str2, str3);
        createProducer.send(createTextMessage);
    }

    public void sendBytesMessage(byte[] bArr) throws Exception {
        MessageProducer createProducer = this.session.createProducer(this.queue);
        BytesMessage createBytesMessage = this.session.createBytesMessage();
        createBytesMessage.writeBytes(bArr);
        createProducer.send(createBytesMessage);
    }

    public void testConnect() throws Exception {
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\nrequest-id: 1\n\n��");
        String receiveFrame = receiveFrame(10000L);
        assertTrue(receiveFrame.startsWith(Stomp.Responses.CONNECTED));
        assertTrue(receiveFrame.indexOf("response-id:1") >= 0);
    }

    public void testSendMessage() throws Exception {
        MessageConsumer createConsumer = this.session.createConsumer(this.queue);
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        assertTrue(receiveFrame(10000L).startsWith(Stomp.Responses.CONNECTED));
        sendFrame(new StringBuffer().append("SEND\ndestination:/queue/").append(getQueueName()).append("\n\n").append("Hello World").append(Stomp.NULL).toString());
        TextMessage textMessage = (TextMessage) createConsumer.receive(1000L);
        assertNotNull(textMessage);
        assertEquals("Hello World", textMessage.getText());
        assertTrue(Math.abs(System.currentTimeMillis() - textMessage.getJMSTimestamp()) < 1000);
    }

    public void testJMSXGroupIdCanBeSet() throws Exception {
        MessageConsumer createConsumer = this.session.createConsumer(this.queue);
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        assertTrue(receiveFrame(10000L).startsWith(Stomp.Responses.CONNECTED));
        sendFrame(new StringBuffer().append("SEND\ndestination:/queue/").append(getQueueName()).append("\n").append("JMSXGroupID: TEST\n\n").append("Hello World").append(Stomp.NULL).toString());
        TextMessage textMessage = (TextMessage) createConsumer.receive(1000L);
        assertNotNull(textMessage);
        assertEquals("TEST", ((ActiveMQTextMessage) textMessage).getGroupID());
    }

    public void testSendMessageWithCustomHeadersAndSelector() throws Exception {
        MessageConsumer createConsumer = this.session.createConsumer(this.queue, "foo = 'abc'");
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        assertTrue(receiveFrame(10000L).startsWith(Stomp.Responses.CONNECTED));
        sendFrame(new StringBuffer().append("SEND\nfoo:abc\nbar:123\ndestination:/queue/").append(getQueueName()).append("\n\n").append("Hello World").append(Stomp.NULL).toString());
        TextMessage textMessage = (TextMessage) createConsumer.receive(1000L);
        assertNotNull(textMessage);
        assertEquals("Hello World", textMessage.getText());
        assertEquals("foo", "abc", textMessage.getStringProperty("foo"));
        assertEquals("bar", "123", textMessage.getStringProperty("bar"));
    }

    public void testSendMessageWithStandardHeaders() throws Exception {
        MessageConsumer createConsumer = this.session.createConsumer(this.queue);
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        assertTrue(receiveFrame(10000L).startsWith(Stomp.Responses.CONNECTED));
        sendFrame(new StringBuffer().append("SEND\ncorrelation-id:c123\npriority:3\ntype:t345\nJMSXGroupID:abc\nfoo:abc\nbar:123\ndestination:/queue/").append(getQueueName()).append("\n\n").append("Hello World").append(Stomp.NULL).toString());
        TextMessage textMessage = (TextMessage) createConsumer.receive(1000L);
        assertNotNull(textMessage);
        assertEquals("Hello World", textMessage.getText());
        assertEquals("JMSCorrelationID", "c123", textMessage.getJMSCorrelationID());
        assertEquals("getJMSType", "t345", textMessage.getJMSType());
        assertEquals("getJMSPriority", 3, textMessage.getJMSPriority());
        assertEquals("foo", "abc", textMessage.getStringProperty("foo"));
        assertEquals("bar", "123", textMessage.getStringProperty("bar"));
        assertEquals("JMSXGroupID", "abc", textMessage.getStringProperty("JMSXGroupID"));
        assertEquals("GroupID", "abc", ((ActiveMQTextMessage) textMessage).getGroupID());
    }

    public void testSubscribeWithAutoAck() throws Exception {
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        assertTrue(receiveFrame(100000L).startsWith(Stomp.Responses.CONNECTED));
        sendFrame(new StringBuffer().append("SUBSCRIBE\ndestination:/queue/").append(getQueueName()).append("\n").append("ack:auto\n\n").append(Stomp.NULL).toString());
        sendMessage(getName());
        assertTrue(receiveFrame(10000L).startsWith(Stomp.Responses.MESSAGE));
        sendFrame("DISCONNECT\n\n\n��");
    }

    public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        assertTrue(receiveFrame(100000L).startsWith(Stomp.Responses.CONNECTED));
        sendFrame(new StringBuffer().append("SUBSCRIBE\ndestination:/queue/").append(getQueueName()).append("\n").append("ack:auto\n\n").append(Stomp.NULL).toString());
        sendBytesMessage(new byte[]{1, 2, 3, 4, 5});
        String receiveFrame = receiveFrame(10000L);
        assertTrue(receiveFrame.startsWith(Stomp.Responses.MESSAGE));
        Matcher matcher = Pattern.compile("Content-length:\\s*(\\d+)", 2).matcher(receiveFrame);
        assertTrue(matcher.find());
        assertEquals("5", matcher.group(1));
        assertFalse(Pattern.compile("type:\\s*null", 2).matcher(receiveFrame).find());
        sendFrame("DISCONNECT\n\n\n��");
    }

    public void testSubscribeWithMessageSentWithProperties() throws Exception {
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        assertTrue(receiveFrame(100000L).startsWith(Stomp.Responses.CONNECTED));
        sendFrame(new StringBuffer().append("SUBSCRIBE\ndestination:/queue/").append(getQueueName()).append("\n").append("ack:auto\n\n").append(Stomp.NULL).toString());
        MessageProducer createProducer = this.session.createProducer(this.queue);
        TextMessage createTextMessage = this.session.createTextMessage("Hello World");
        createTextMessage.setStringProperty("s", "value");
        createTextMessage.setBooleanProperty("n", false);
        createTextMessage.setByteProperty("byte", (byte) 9);
        createTextMessage.setDoubleProperty("d", 2.0d);
        createTextMessage.setFloatProperty("f", 6.0f);
        createTextMessage.setIntProperty("i", 10);
        createTextMessage.setLongProperty("l", 121L);
        createTextMessage.setShortProperty("s", (short) 12);
        createProducer.send(createTextMessage);
        assertTrue(receiveFrame(10000L).startsWith(Stomp.Responses.MESSAGE));
        sendFrame("DISCONNECT\n\n\n��");
    }

    public void testMessagesAreInOrder() throws Exception {
        String[] strArr = new String[10];
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        assertTrue(receiveFrame(100000L).startsWith(Stomp.Responses.CONNECTED));
        sendFrame(new StringBuffer().append("SUBSCRIBE\ndestination:/queue/").append(getQueueName()).append("\n").append("ack:auto\n\n").append(Stomp.NULL).toString());
        for (int i = 0; i < 10; i++) {
            strArr[i] = new StringBuffer().append(getName()).append(i).toString();
            sendMessage(strArr[i]);
        }
        for (int i2 = 0; i2 < 10; i2++) {
            assertTrue("Message not in order", receiveFrame(1000L).indexOf(strArr[i2]) >= 0);
        }
        waitForFrameToTakeEffect();
        for (int i3 = 0; i3 < 10; i3++) {
            strArr[i3] = new StringBuffer().append(getName()).append(":second:").append(i3).toString();
            sendMessage(strArr[i3]);
        }
        for (int i4 = 0; i4 < 10; i4++) {
            assertTrue("Message not in order", receiveFrame(1000L).indexOf(strArr[i4]) >= 0);
        }
        sendFrame("DISCONNECT\n\n\n��");
    }

    public void testSubscribeWithAutoAckAndSelector() throws Exception {
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        assertTrue(receiveFrame(100000L).startsWith(Stomp.Responses.CONNECTED));
        sendFrame(new StringBuffer().append("SUBSCRIBE\ndestination:/queue/").append(getQueueName()).append("\n").append("selector: foo = 'zzz'\n").append("ack:auto\n\n").append(Stomp.NULL).toString());
        sendMessage("Ignored message", "foo", "1234");
        sendMessage("Real message", "foo", "zzz");
        String receiveFrame = receiveFrame(10000L);
        assertTrue(receiveFrame.startsWith(Stomp.Responses.MESSAGE));
        assertTrue(new StringBuffer().append("Should have received the real message but got: ").append(receiveFrame).toString(), receiveFrame.indexOf("Real message") > 0);
        sendFrame("DISCONNECT\n\n\n��");
    }

    public void testSubscribeWithClientAck() throws Exception {
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        assertTrue(receiveFrame(10000L).startsWith(Stomp.Responses.CONNECTED));
        sendFrame(new StringBuffer().append("SUBSCRIBE\ndestination:/queue/").append(getQueueName()).append("\n").append("ack:client\n\n").append(Stomp.NULL).toString());
        sendMessage(getName());
        assertTrue(receiveFrame(10000L).startsWith(Stomp.Responses.MESSAGE));
        sendFrame("DISCONNECT\n\n\n��");
        TextMessage textMessage = (TextMessage) this.session.createConsumer(this.queue).receive(1000L);
        assertNotNull(textMessage);
        assertTrue(textMessage.getJMSRedelivered());
    }

    public void testUnsubscribe() throws Exception {
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        assertTrue(receiveFrame(100000L).startsWith(Stomp.Responses.CONNECTED));
        sendFrame(new StringBuffer().append("SUBSCRIBE\ndestination:/queue/").append(getQueueName()).append("\n").append("ack:auto\n\n").append(Stomp.NULL).toString());
        sendMessage("first message");
        assertTrue(receiveFrame(1000L).startsWith(Stomp.Responses.MESSAGE));
        sendFrame(new StringBuffer().append("UNSUBSCRIBE\ndestination:/queue/").append(getQueueName()).append("\n").append("\n\n").append(Stomp.NULL).toString());
        waitForFrameToTakeEffect();
        sendMessage("second message");
        try {
            log.info(new StringBuffer().append("Received frame: ").append(receiveFrame(1000L)).toString());
            fail("No message should have been received since subscription was removed");
        } catch (SocketTimeoutException e) {
        }
    }

    public void testTransactionCommit() throws Exception {
        MessageConsumer createConsumer = this.session.createConsumer(this.queue);
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        assertTrue(receiveFrame(1000L).startsWith(Stomp.Responses.CONNECTED));
        sendFrame("BEGIN\ntransaction: tx1\n\n\n��");
        sendFrame(new StringBuffer().append("SEND\ndestination:/queue/").append(getQueueName()).append("\n").append("transaction: tx1\n").append("\n\n").append("Hello World").append(Stomp.NULL).toString());
        sendFrame("COMMIT\ntransaction: tx1\n\n\n��");
        waitForFrameToTakeEffect();
        assertNotNull("Should have received a message", (TextMessage) createConsumer.receive(1000L));
    }

    public void testTransactionRollback() throws Exception {
        MessageConsumer createConsumer = this.session.createConsumer(this.queue);
        sendFrame("CONNECT\nlogin: brianm\npasscode: wombats\n\n��");
        assertTrue(receiveFrame(1000L).startsWith(Stomp.Responses.CONNECTED));
        sendFrame("BEGIN\ntransaction: tx1\n\n\n��");
        sendFrame(new StringBuffer().append("SEND\ndestination:/queue/").append(getQueueName()).append("\n").append("transaction: tx1\n").append("\n").append("first message").append(Stomp.NULL).toString());
        sendFrame("ABORT\ntransaction: tx1\n\n\n��");
        sendFrame("BEGIN\ntransaction: tx1\n\n\n��");
        sendFrame(new StringBuffer().append("SEND\ndestination:/queue/").append(getQueueName()).append("\n").append("transaction: tx1\n").append("\n").append("second message").append(Stomp.NULL).toString());
        sendFrame("COMMIT\ntransaction: tx1\n\n\n��");
        waitForFrameToTakeEffect();
        TextMessage textMessage = (TextMessage) createConsumer.receive(1000L);
        assertNotNull(textMessage);
        assertEquals("second message", textMessage.getText().trim());
    }

    protected void waitForFrameToTakeEffect() throws InterruptedException {
        Thread.sleep(2000L);
    }
}
