package org.apache.activemq.network;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.MessageNotWriteableException;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.StubConnection;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.util.Wait;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.class */
public class BrokerNetworkWithStuckMessagesTest extends TestCase {
    private static final Logger LOG = LoggerFactory.getLogger(BrokerNetworkWithStuckMessagesTest.class);
    private BrokerService localBroker;
    private BrokerService remoteBroker;
    private DemandForwardingBridge bridge;
    protected TransportConnector connector;
    protected TransportConnector remoteConnector;
    protected long idGenerator;
    protected int msgIdGenerator;
    protected int tempDestGenerator;
    protected Map<String, BrokerService> brokers = new HashMap();
    protected ArrayList connections = new ArrayList();
    protected int maxWait = 4000;
    protected String queueName = "TEST";
    protected String amqDomain = "org.apache.activemq";

    protected void setUp() throws Exception {
        createBroker();
        createRemoteBroker();
        FileUtils.deleteDirectory(new File("activemq-data"));
        NetworkBridgeConfiguration networkBridgeConfiguration = new NetworkBridgeConfiguration();
        networkBridgeConfiguration.setBrokerName("local");
        networkBridgeConfiguration.setDispatchAsync(false);
        this.bridge = new DemandForwardingBridge(networkBridgeConfiguration, createTransport(), createRemoteTransport());
        this.bridge.setBrokerService(this.localBroker);
        this.bridge.start();
        waitForBridgeFormation();
    }

    protected void waitForBridgeFormation() throws Exception {
        for (final BrokerService brokerService : this.brokers.values()) {
            if (!brokerService.getNetworkConnectors().isEmpty()) {
                Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.BrokerNetworkWithStuckMessagesTest.1
                    @Override // org.apache.activemq.util.Wait.Condition
                    public boolean isSatisified() throws Exception {
                        return !((NetworkConnector) brokerService.getNetworkConnectors().get(0)).activeBridges().isEmpty();
                    }
                });
            }
        }
    }

    protected void tearDown() throws Exception {
        this.bridge.stop();
        this.localBroker.stop();
        this.remoteBroker.stop();
    }

    public void testBrokerNetworkWithStuckMessages() throws Exception {
        StubConnection createConnection = createConnection();
        ConnectionInfo createConnectionInfo = createConnectionInfo();
        SessionInfo createSessionInfo = createSessionInfo(createConnectionInfo);
        ProducerInfo createProducerInfo = createProducerInfo(createSessionInfo);
        createConnection.send(createConnectionInfo);
        createConnection.send(createSessionInfo);
        createConnection.send(createProducerInfo);
        ActiveMQDestination activeMQDestination = null;
        for (int i = 0; i < 10; i++) {
            activeMQDestination = createDestinationInfo(createConnection, createConnectionInfo, (byte) 1);
            createConnection.request(createMessage(createProducerInfo, activeMQDestination, 1));
        }
        assertEquals(10, browseQueueWithJmx(this.localBroker).length);
        StubConnection createRemoteConnection = createRemoteConnection();
        ConnectionInfo createConnectionInfo2 = createConnectionInfo();
        SessionInfo createSessionInfo2 = createSessionInfo(createConnectionInfo2);
        createRemoteConnection.send(createConnectionInfo2);
        createRemoteConnection.send(createSessionInfo2);
        ActiveMQDestination createDestinationInfo = createDestinationInfo(createRemoteConnection, createConnectionInfo2, (byte) 1);
        ConsumerInfo createConsumerInfo = createConsumerInfo(createSessionInfo2, createDestinationInfo);
        createRemoteConnection.send(createConsumerInfo);
        for (int i2 = 0; i2 < 5; i2++) {
            Message receiveMessage = receiveMessage(createRemoteConnection);
            assertNotNull(receiveMessage);
            createRemoteConnection.send(createAck(createConsumerInfo, receiveMessage, 1, (byte) 2));
            LOG.info("Found [" + browseQueueWithJmx(this.remoteBroker).length + "] messages with JMX");
        }
        assertEquals(0, browseQueueWithJmx(this.localBroker).length);
        createRemoteConnection.send(createConsumerInfo.createRemoveCommand());
        assertEquals(5, browseQueueWithJmx(this.remoteBroker).length);
        createConnection.send(createConsumerInfo(createSessionInfo, activeMQDestination));
        assertNull(receiveMessage(createConnection));
        ConsumerInfo createConsumerInfo2 = createConsumerInfo(createSessionInfo2, createDestinationInfo);
        createRemoteConnection.send(createConsumerInfo2);
        int i3 = 0;
        for (int i4 = 0; i4 < 5; i4++) {
            Message receiveMessage2 = receiveMessage(createRemoteConnection);
            assertNotNull(receiveMessage2);
            createRemoteConnection.send(createAck(createConsumerInfo2, receiveMessage2, 1, (byte) 2));
            i3++;
        }
        assertEquals(5, i3);
        Thread.sleep(4000L);
        assertEquals(0, browseQueueWithJmx(this.remoteBroker).length);
        createRemoteConnection.send(createConsumerInfo2.createRemoveCommand());
        createConnection.stop();
        createRemoteConnection.stop();
    }

    protected BrokerService createBroker() throws Exception {
        this.localBroker = new BrokerService();
        this.localBroker.setBrokerName("localhost");
        this.localBroker.setUseJmx(true);
        this.localBroker.setPersistenceAdapter((PersistenceAdapter) null);
        this.localBroker.setPersistent(false);
        this.connector = createConnector();
        this.localBroker.addConnector(this.connector);
        this.localBroker.start();
        this.localBroker.waitUntilStarted();
        this.localBroker.getManagementContext().setConnectorPort(2221);
        this.brokers.put(this.localBroker.getBrokerName(), this.localBroker);
        return this.localBroker;
    }

    protected BrokerService createRemoteBroker() throws Exception {
        this.remoteBroker = new BrokerService();
        this.remoteBroker.setBrokerName("remotehost");
        this.remoteBroker.setUseJmx(true);
        this.remoteBroker.setPersistenceAdapter((PersistenceAdapter) null);
        this.remoteBroker.setPersistent(false);
        this.remoteConnector = createRemoteConnector();
        this.remoteBroker.addConnector(this.remoteConnector);
        this.remoteBroker.waitUntilStarted();
        this.remoteBroker.getManagementContext().setConnectorPort(2222);
        this.brokers.put(this.remoteBroker.getBrokerName(), this.remoteBroker);
        return this.remoteBroker;
    }

    protected Transport createTransport() throws Exception {
        return TransportFactory.connect(this.connector.getServer().getConnectURI());
    }

    protected Transport createRemoteTransport() throws Exception {
        return TransportFactory.connect(this.remoteConnector.getServer().getConnectURI());
    }

    protected TransportConnector createConnector() throws Exception, IOException, URISyntaxException {
        return new TransportConnector(TransportFactory.bind(new URI(getLocalURI())));
    }

    protected TransportConnector createRemoteConnector() throws Exception, IOException, URISyntaxException {
        return new TransportConnector(TransportFactory.bind(new URI(getRemoteURI())));
    }

    protected String getRemoteURI() {
        return "vm://remotehost";
    }

    protected String getLocalURI() {
        return "vm://localhost";
    }

    protected StubConnection createConnection() throws Exception {
        StubConnection stubConnection = new StubConnection(TransportFactory.connect(this.connector.getServer().getConnectURI()));
        this.connections.add(stubConnection);
        return stubConnection;
    }

    protected StubConnection createRemoteConnection() throws Exception {
        StubConnection stubConnection = new StubConnection(TransportFactory.connect(this.remoteConnector.getServer().getConnectURI()));
        this.connections.add(stubConnection);
        return stubConnection;
    }

    private Object[] browseQueueWithJms(BrokerService brokerService) throws Exception {
        Connection connection = null;
        Session session = null;
        try {
            connection = new ActiveMQConnectionFactory(this.connector.getUri().toString()).createConnection();
            connection.start();
            session = connection.createSession(false, 1);
            QueueBrowser createBrowser = session.createBrowser(session.createQueue(this.queueName));
            ArrayList arrayList = new ArrayList();
            Enumeration enumeration = createBrowser.getEnumeration();
            while (enumeration.hasMoreElements()) {
                arrayList.add(enumeration.nextElement());
            }
            Object[] array = arrayList.toArray();
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
            LOG.info("+Browsed with JMS: " + array.length);
            return array;
        } catch (Throwable th) {
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
            throw th;
        }
    }

    private Object[] browseQueueWithJmx(BrokerService brokerService) throws Exception {
        Hashtable hashtable = new Hashtable();
        hashtable.put("BrokerName", brokerService.getBrokerName());
        hashtable.put("Type", "Queue");
        hashtable.put("Destination", this.queueName);
        CompositeData[] browse = ((QueueViewMBean) brokerService.getManagementContext().newProxyInstance(ObjectName.getInstance(this.amqDomain, hashtable), QueueViewMBean.class, true)).browse();
        LOG.info("+Browsed with JMX: " + browse.length);
        return browse;
    }

    protected ConnectionInfo createConnectionInfo() throws Exception {
        ConnectionInfo connectionInfo = new ConnectionInfo();
        StringBuilder append = new StringBuilder().append("connection:");
        long j = this.idGenerator + 1;
        this.idGenerator = j;
        connectionInfo.setConnectionId(new ConnectionId(append.append(j).toString()));
        connectionInfo.setClientId(connectionInfo.getConnectionId().getValue());
        return connectionInfo;
    }

    protected SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception {
        long j = this.idGenerator + 1;
        this.idGenerator = j;
        return new SessionInfo(connectionInfo, j);
    }

    protected ProducerInfo createProducerInfo(SessionInfo sessionInfo) throws Exception {
        long j = this.idGenerator + 1;
        this.idGenerator = j;
        return new ProducerInfo(sessionInfo, j);
    }

    protected ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, ActiveMQDestination activeMQDestination) throws Exception {
        long j = this.idGenerator + 1;
        this.idGenerator = j;
        ConsumerInfo consumerInfo = new ConsumerInfo(sessionInfo, j);
        consumerInfo.setBrowser(false);
        consumerInfo.setDestination(activeMQDestination);
        consumerInfo.setPrefetchSize(1000);
        consumerInfo.setDispatchAsync(false);
        return consumerInfo;
    }

    protected DestinationInfo createTempDestinationInfo(ConnectionInfo connectionInfo, byte b) {
        DestinationInfo destinationInfo = new DestinationInfo();
        destinationInfo.setConnectionId(connectionInfo.getConnectionId());
        destinationInfo.setOperationType((byte) 0);
        StringBuilder append = new StringBuilder().append(destinationInfo.getConnectionId()).append(":");
        int i = this.tempDestGenerator + 1;
        this.tempDestGenerator = i;
        destinationInfo.setDestination(ActiveMQDestination.createDestination(append.append(i).toString(), b));
        return destinationInfo;
    }

    protected ActiveMQDestination createDestinationInfo(StubConnection stubConnection, ConnectionInfo connectionInfo, byte b) throws Exception {
        if ((b & 4) == 0) {
            return ActiveMQDestination.createDestination(this.queueName, b);
        }
        DestinationInfo createTempDestinationInfo = createTempDestinationInfo(connectionInfo, b);
        stubConnection.send(createTempDestinationInfo);
        return createTempDestinationInfo.getDestination();
    }

    protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination activeMQDestination, int i) {
        Message createMessage = createMessage(producerInfo, activeMQDestination);
        createMessage.setPersistent(i == 2);
        return createMessage;
    }

    protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination activeMQDestination) {
        ActiveMQTextMessage activeMQTextMessage = new ActiveMQTextMessage();
        int i = this.msgIdGenerator + 1;
        this.msgIdGenerator = i;
        activeMQTextMessage.setMessageId(new MessageId(producerInfo, i));
        activeMQTextMessage.setDestination(activeMQDestination);
        activeMQTextMessage.setPersistent(false);
        try {
            activeMQTextMessage.setText("Test Message Payload.");
        } catch (MessageNotWriteableException e) {
        }
        return activeMQTextMessage;
    }

    protected MessageAck createAck(ConsumerInfo consumerInfo, Message message, int i, byte b) {
        MessageAck messageAck = new MessageAck();
        messageAck.setAckType(b);
        messageAck.setConsumerId(consumerInfo.getConsumerId());
        messageAck.setDestination(message.getDestination());
        messageAck.setLastMessageId(message.getMessageId());
        messageAck.setMessageCount(i);
        return messageAck;
    }

    public Message receiveMessage(StubConnection stubConnection) throws InterruptedException {
        return receiveMessage(stubConnection, this.maxWait);
    }

    public Message receiveMessage(StubConnection stubConnection, long j) throws InterruptedException {
        Object poll;
        do {
            poll = stubConnection.getDispatchQueue().poll(j, TimeUnit.MILLISECONDS);
            if (poll == null) {
                return null;
            }
        } while (!(poll instanceof MessageDispatch));
        MessageDispatch messageDispatch = (MessageDispatch) poll;
        if (messageDispatch.getMessage() == null) {
            return null;
        }
        messageDispatch.setMessage(messageDispatch.getMessage().copy());
        messageDispatch.getMessage().setRedeliveryCounter(messageDispatch.getRedeliveryCounter());
        return messageDispatch.getMessage();
    }
}
