/*
 * Decompiled with CFR 0.152.
 */
package org.activemq.transport;

import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import java.util.Iterator;
import java.util.Map;
import javax.jms.JMSException;
import org.activemq.ActiveMQConnection;
import org.activemq.ActiveMQConnectionFactory;
import org.activemq.ActiveMQPrefetchPolicy;
import org.activemq.advisories.ConnectionAdvisor;
import org.activemq.advisories.ConnectionAdvisoryEvent;
import org.activemq.advisories.ConnectionAdvisoryEventListener;
import org.activemq.broker.BrokerClient;
import org.activemq.broker.BrokerContainer;
import org.activemq.broker.ConsumerInfoListener;
import org.activemq.message.ActiveMQDestination;
import org.activemq.message.BrokerInfo;
import org.activemq.message.ConsumerInfo;
import org.activemq.message.Receipt;
import org.activemq.service.MessageContainerManager;
import org.activemq.service.Service;
import org.activemq.transport.NetworkConnector;
import org.activemq.transport.NetworkMessageBridge;
import org.activemq.transport.TransportChannel;
import org.activemq.transport.TransportStatusEvent;
import org.activemq.transport.TransportStatusEventListener;
import org.activemq.transport.composite.CompositeTransportChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class NetworkChannel
implements Service,
ConsumerInfoListener,
ConnectionAdvisoryEventListener,
TransportStatusEventListener {
    private static final Log log = LogFactory.getLog((Class)NetworkChannel.class);
    protected String uri;
    protected BrokerContainer brokerContainer;
    protected ActiveMQConnection localConnection;
    protected ActiveMQConnection remoteConnection;
    protected ConcurrentHashMap topicConsumerMap;
    protected ConcurrentHashMap queueConsumerMap;
    protected String remoteUserName;
    protected String remotePassword;
    protected String remoteBrokerName;
    protected String remoteClusterName;
    protected int maximumRetries = 0;
    protected long reconnectSleepTime = 2000L;
    protected PooledExecutor threadPool;
    private boolean remote = false;
    private SynchronizedBoolean started = new SynchronizedBoolean(false);
    private SynchronizedBoolean connected = new SynchronizedBoolean(false);
    private SynchronizedBoolean stopped = new SynchronizedBoolean(false);
    private ConnectionAdvisor connectionAdvisor;
    private ActiveMQPrefetchPolicy localPrefetchPolicy;
    private ActiveMQPrefetchPolicy remotePrefetchPolicy;

    public NetworkChannel() {
        this.topicConsumerMap = new ConcurrentHashMap();
        this.queueConsumerMap = new ConcurrentHashMap();
    }

    public NetworkChannel(PooledExecutor tp) {
        this();
        this.threadPool = tp;
    }

    public NetworkChannel(NetworkConnector connector, BrokerContainer brokerContainer, String uri) {
        this(connector.threadPool);
        this.brokerContainer = brokerContainer;
        this.uri = uri;
    }

    public NetworkChannel(NetworkConnector connector, BrokerContainer brokerContainer, TransportChannel channel, String remoteBrokerName, String remoteclusterName) throws JMSException {
        this(connector.threadPool);
        this.brokerContainer = brokerContainer;
        this.uri = "";
        this.remoteBrokerName = remoteBrokerName;
        this.remoteClusterName = remoteclusterName;
        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory();
        fac.setJ2EEcompliant(false);
        fac.setTurboBoost(true);
        this.remoteConnection = new ActiveMQConnection(fac, this.remoteUserName, this.remotePassword, channel);
        this.remoteConnection.setClientID("Boondocks:" + this.remoteClusterName + ":" + remoteBrokerName);
        this.remoteConnection.setQuickClose(true);
        this.remoteConnection.start();
        BrokerInfo info = new BrokerInfo();
        info.setBrokerName(brokerContainer.getBroker().getBrokerName());
        info.setClusterName(brokerContainer.getBroker().getBrokerClusterName());
        channel.asyncSend(info);
        this.remote = true;
    }

    public void statusChanged(TransportStatusEvent event) {
        if (event != null) {
            if (event.getChannelStatus() == 1 || event.getChannelStatus() == 3) {
                if (this.remoteConnection == null) {
                    try {
                        this.initializeRemote();
                    }
                    catch (JMSException e) {
                        log.error((Object)"Failed to initialize remote connection", (Throwable)e);
                    }
                }
                this.connected.set(true);
            } else if (event.getChannelStatus() == 2) {
                try {
                    this.releaseRemote();
                }
                catch (JMSException e) {
                    log.warn((Object)"Failed to release remote connection", (Throwable)e);
                }
                this.connected.set(false);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doSetConnected() {
        SynchronizedBoolean synchronizedBoolean = this.connected;
        synchronized (synchronizedBoolean) {
            this.connected.set(true);
            this.connected.notifyAll();
        }
    }

    public String toString() {
        return "NetworkChannel{ , uri = '" + this.uri + "' " + ", remoteBrokerName = '" + this.remoteBrokerName + "' " + " }";
    }

    public void start() {
        if (this.started.commit(false, true)) {
            try {
                this.stopped.set(false);
                this.threadPool.execute(new Runnable(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    public void run() {
                        String originalName = Thread.currentThread().getName();
                        try {
                            Thread.currentThread().setName("NetworkChannel Initiator to " + NetworkChannel.this.uri);
                            NetworkChannel.this.initialize();
                            NetworkChannel.this.startSubscriptions();
                            log.info((Object)("Started NetworkChannel to " + NetworkChannel.this.uri));
                        }
                        catch (JMSException jmsEx) {
                            log.error((Object)("Failed to start NetworkChannel: " + NetworkChannel.this.uri), (Throwable)jmsEx);
                        }
                        finally {
                            Thread.currentThread().setName(originalName);
                        }
                    }
                });
            }
            catch (InterruptedException e) {
                log.warn((Object)"Failed to start - interuppted", (Throwable)e);
            }
        }
    }

    public void stop() throws JMSException {
        if (this.started.commit(true, false)) {
            this.stopped.set(true);
            this.topicConsumerMap.clear();
            if (this.remoteConnection != null) {
                this.remoteConnection.close();
                this.remoteConnection = null;
            }
            if (this.localConnection != null) {
                this.localConnection.close();
                this.localConnection = null;
            }
            Iterator i = this.topicConsumerMap.values().iterator();
            while (i.hasNext()) {
                NetworkMessageBridge consumer = (NetworkMessageBridge)i.next();
                consumer.stop();
            }
        }
    }

    public void onConsumerInfo(final BrokerClient client, final ConsumerInfo info) {
        String brokerName = client.getBrokerConnector().getBrokerInfo().getBrokerName();
        if (!client.isClusteredConnection()) {
            if (this.connected.get()) {
                if (!info.hasVisited(this.remoteBrokerName)) {
                    if (info.isStarted()) {
                        this.addConsumerInfo(info);
                    } else {
                        this.removeConsumerInfo(info);
                    }
                }
            } else {
                try {
                    this.threadPool.execute(new Runnable(){

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        public void run() {
                            if (!client.isClusteredConnection() && !info.hasVisited(NetworkChannel.this.remoteBrokerName)) {
                                SynchronizedBoolean synchronizedBoolean = NetworkChannel.this.connected;
                                synchronized (synchronizedBoolean) {
                                    while (!NetworkChannel.this.connected.get() && !NetworkChannel.this.stopped.get()) {
                                        try {
                                            NetworkChannel.this.connected.wait(500L);
                                        }
                                        catch (InterruptedException e) {
                                            log.debug((Object)"interuppted", (Throwable)e);
                                        }
                                    }
                                    if (info.isStarted()) {
                                        NetworkChannel.this.addConsumerInfo(info);
                                    } else {
                                        NetworkChannel.this.removeConsumerInfo(info);
                                    }
                                }
                            }
                        }
                    });
                }
                catch (InterruptedException e) {
                    log.warn((Object)("Failed to process ConsumerInfo: " + info), (Throwable)e);
                }
            }
        }
    }

    public String getUri() {
        return this.uri;
    }

    public void setUri(String uri) {
        this.uri = uri;
    }

    public String getRemotePassword() {
        return this.remotePassword;
    }

    public void setRemotePassword(String remotePassword) {
        this.remotePassword = remotePassword;
    }

    public String getRemoteUserName() {
        return this.remoteUserName;
    }

    public void setRemoteUserName(String remoteUserName) {
        this.remoteUserName = remoteUserName;
    }

    public BrokerContainer getBrokerContainer() {
        return this.brokerContainer;
    }

    public void setBrokerContainer(BrokerContainer brokerContainer) {
        this.brokerContainer = brokerContainer;
    }

    public int getMaximumRetries() {
        return this.maximumRetries;
    }

    public void setMaximumRetries(int maximumRetries) {
        this.maximumRetries = maximumRetries;
    }

    public long getReconnectSleepTime() {
        return this.reconnectSleepTime;
    }

    public void setReconnectSleepTime(long reconnectSleepTime) {
        this.reconnectSleepTime = reconnectSleepTime;
    }

    public String getRemoteBrokerName() {
        return this.remoteBrokerName;
    }

    public void setRemoteBrokerName(String remoteBrokerName) {
        this.remoteBrokerName = remoteBrokerName;
    }

    protected PooledExecutor getThreadPool() {
        return this.threadPool;
    }

    protected void setThreadPool(PooledExecutor threadPool) {
        this.threadPool = threadPool;
    }

    private synchronized ActiveMQConnection getLocalConnection() throws JMSException {
        if (this.localConnection == null) {
            this.initializeLocal();
        }
        return this.localConnection;
    }

    private synchronized ActiveMQConnection getRemoteConnection() throws JMSException {
        if (this.remoteConnection == null) {
            this.initializeRemote();
        }
        return this.remoteConnection;
    }

    public ActiveMQPrefetchPolicy getLocalPrefetchPolicy() {
        return this.localPrefetchPolicy;
    }

    public void setLocalPrefetchPolicy(ActiveMQPrefetchPolicy localPrefetchPolicy) {
        this.localPrefetchPolicy = localPrefetchPolicy;
    }

    public ActiveMQPrefetchPolicy getRemotePrefetchPolicy() {
        return this.remotePrefetchPolicy;
    }

    public void setRemotePrefetchPolicy(ActiveMQPrefetchPolicy remotePrefetchPolicy) {
        this.remotePrefetchPolicy = remotePrefetchPolicy;
    }

    public void onEvent(ConnectionAdvisoryEvent event) {
        String localBrokerName = this.brokerContainer.getBroker().getBrokerName();
        if (!event.getInfo().isClosed()) {
            this.brokerContainer.registerRemoteClientID(event.getInfo().getClientId());
        } else {
            this.brokerContainer.deregisterRemoteClientID(event.getInfo().getClientId());
        }
    }

    private void addConsumerInfo(ConsumerInfo info) {
        this.addConsumerInfo(info.getDestination(), info.getDestination().isTopic(), info.isDurableTopic());
    }

    private void addConsumerInfo(ActiveMQDestination destination, boolean topic, boolean durableTopic) {
        ConcurrentHashMap map = topic ? this.topicConsumerMap : this.queueConsumerMap;
        NetworkMessageBridge bridge = (NetworkMessageBridge)map.get(destination.getPhysicalName());
        if (bridge == null) {
            bridge = this.createBridge((Map)map, destination, durableTopic);
        } else if (durableTopic && !bridge.isDurableTopic()) {
            bridge.decrementReferenceCount();
            this.upgradeBridge(bridge);
        }
        bridge.incrementReferenceCount();
    }

    private void upgradeBridge(NetworkMessageBridge bridge) {
        try {
            this.remoteConnection.stop();
            bridge.upgrade();
        }
        catch (JMSException e) {
            log.warn((Object)("Could not upgrade the NetworkMessageBridge to a durable subscription for destination: " + bridge.getDestination()), (Throwable)e);
        }
        try {
            this.remoteConnection.start();
        }
        catch (JMSException e) {
            log.error((Object)"Failed to restart the NetworkMessageBridge", (Throwable)e);
        }
    }

    private NetworkMessageBridge createBridge(Map map, ActiveMQDestination destination, boolean durableTopic) {
        NetworkMessageBridge bridge = new NetworkMessageBridge();
        try {
            bridge.setDestination(destination);
            bridge.setDurableTopic(durableTopic);
            bridge.setLocalBrokerName(this.brokerContainer.getBroker().getBrokerName());
            bridge.setLocalSession(this.getLocalConnection().createSession(false, 2));
            bridge.setRemoteSession(this.getRemoteConnection().createSession(false, 2));
            map.put(destination.getPhysicalName(), bridge);
            bridge.start();
            log.info((Object)("started NetworkMessageBridge for destination: " + destination + " -- NetworkChannel: " + this.toString()));
        }
        catch (JMSException jmsEx) {
            log.error((Object)("Failed to start NetworkMessageBridge for destination: " + destination), (Throwable)jmsEx);
            try {
                this.releaseRemote();
            }
            catch (JMSException e) {
                log.warn((Object)"Failed to release remote connection", (Throwable)e);
            }
        }
        return bridge;
    }

    private void removeConsumerInfo(final ConsumerInfo info) {
        final String physicalName = info.getDestination().getPhysicalName();
        final NetworkMessageBridge bridge = (NetworkMessageBridge)this.topicConsumerMap.get((Object)physicalName);
        if (bridge != null && bridge.decrementReferenceCount() <= 0) {
            try {
                this.threadPool.execute(new Runnable(){

                    public void run() {
                        bridge.stop();
                        NetworkChannel.this.topicConsumerMap.remove((Object)physicalName);
                        log.info((Object)("stopped MetworkMessageBridge for destination: " + info.getDestination()));
                    }
                });
            }
            catch (InterruptedException e) {
                log.warn((Object)"got interrupted stoping NetworkBridge", (Throwable)e);
            }
        }
    }

    private void startSubscriptions() {
        if (!this.remote) {
            Map map;
            MessageContainerManager mcm = this.brokerContainer.getBroker().getPersistentTopicContainerManager();
            if (mcm != null) {
                map = mcm.getLocalDestinations();
                this.startSubscriptions(map, true, true);
            }
            if ((mcm = this.brokerContainer.getBroker().getTransientTopicContainerManager()) != null) {
                map = mcm.getLocalDestinations();
                this.startSubscriptions(map, true, false);
            }
            if ((mcm = this.brokerContainer.getBroker().getTransientQueueContainerManager()) != null) {
                map = mcm.getLocalDestinations();
                this.startSubscriptions(map, false, false);
            }
            if ((mcm = this.brokerContainer.getBroker().getPersistentQueueContainerManager()) != null) {
                map = mcm.getLocalDestinations();
                this.startSubscriptions(map, false, false);
            }
        }
    }

    private void startSubscriptions(Map destinations, boolean topic, boolean durableTopic) {
        if (destinations != null) {
            Iterator i = destinations.values().iterator();
            while (i.hasNext()) {
                ActiveMQDestination dest = (ActiveMQDestination)i.next();
                this.addConsumerInfo(dest, topic, durableTopic);
            }
        }
    }

    protected void initialize() throws JMSException {
        this.initializeLocal();
        this.initializeRemote();
        this.brokerContainer.getBroker().addConsumerInfoListener(this);
    }

    private synchronized void initializeRemote() throws JMSException {
        if (this.remoteConnection == null) {
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(this.remoteUserName, this.remotePassword, this.uri);
            factory.setJ2EEcompliant(false);
            factory.setQuickClose(true);
            factory.setInternalConnection(true);
            this.remoteConnection = (ActiveMQConnection)factory.createConnection();
            TransportChannel transportChannel = this.remoteConnection.getTransportChannel();
            if (transportChannel instanceof CompositeTransportChannel) {
                CompositeTransportChannel composite = (CompositeTransportChannel)transportChannel;
                composite.setMaximumRetries(this.maximumRetries);
                composite.setFailureSleepTime(this.reconnectSleepTime);
                composite.setIncrementTimeout(false);
            }
            transportChannel.addTransportStatusEventListener(this);
            this.remoteConnection.setClientID(this.brokerContainer.getBroker().getBrokerName() + "_NetworkChannel");
            this.remoteConnection.start();
            BrokerInfo info = new BrokerInfo();
            info.setBrokerName(this.brokerContainer.getBroker().getBrokerName());
            info.setClusterName(this.brokerContainer.getBroker().getBrokerClusterName());
            Receipt receipt = this.remoteConnection.syncSendRequest(info);
            if (receipt != null) {
                this.remoteBrokerName = receipt.getBrokerName();
                this.remoteClusterName = receipt.getClusterName();
            }
            this.connectionAdvisor = new ConnectionAdvisor(this.remoteConnection);
            this.connectionAdvisor.addListener(this);
            this.connectionAdvisor.start();
            if (this.remotePrefetchPolicy != null) {
                this.remoteConnection.setPrefetchPolicy(this.remotePrefetchPolicy);
            }
        }
        this.doSetConnected();
    }

    private void initializeLocal() throws JMSException {
        String brokerName = this.brokerContainer.getBroker().getBrokerName();
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://" + brokerName);
        factory.setTurboBoost(true);
        factory.setJ2EEcompliant(false);
        factory.setBrokerName(brokerName);
        factory.setQuickClose(true);
        factory.setInternalConnection(true);
        this.localConnection = (ActiveMQConnection)factory.createConnection();
        this.localConnection.start();
        BrokerInfo info = new BrokerInfo();
        info.setBrokerName(this.remoteBrokerName);
        info.setClusterName(this.remoteClusterName);
        this.localConnection.asyncSendPacket(info);
        if (this.localPrefetchPolicy != null) {
            this.localConnection.setPrefetchPolicy(this.localPrefetchPolicy);
        }
    }

    private synchronized void releaseRemote() throws JMSException {
        if (this.remoteConnection != null) {
            TransportChannel transportChannel = this.remoteConnection.getTransportChannel();
            transportChannel.stop();
            if (this.connectionAdvisor != null) {
                this.connectionAdvisor.stop();
            }
            try {
                this.remoteConnection.stop();
            }
            catch (JMSException jMSException) {
                // empty catch block
            }
            this.remoteConnection = null;
        }
    }
}

