/*
 * Decompiled with CFR 0.152.
 */
package org.codehaus.activemq.transport;

import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import java.util.Iterator;
import java.util.Map;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.ActiveMQConnection;
import org.codehaus.activemq.ActiveMQConnectionFactory;
import org.codehaus.activemq.advisories.ConnectionAdvisor;
import org.codehaus.activemq.advisories.ConnectionAdvisoryEvent;
import org.codehaus.activemq.advisories.ConnectionAdvisoryEventListener;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.broker.BrokerContainer;
import org.codehaus.activemq.broker.ConsumerInfoListener;
import org.codehaus.activemq.message.ActiveMQDestination;
import org.codehaus.activemq.message.BrokerInfo;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.Receipt;
import org.codehaus.activemq.service.MessageContainerManager;
import org.codehaus.activemq.service.Service;
import org.codehaus.activemq.transport.NetworkConnector;
import org.codehaus.activemq.transport.NetworkMessageBridge;
import org.codehaus.activemq.transport.TransportChannel;
import org.codehaus.activemq.transport.TransportStatusEvent;
import org.codehaus.activemq.transport.TransportStatusEventListener;
import org.codehaus.activemq.transport.composite.CompositeTransportChannel;

public class NetworkChannel
implements Service,
ConsumerInfoListener,
ConnectionAdvisoryEventListener,
TransportStatusEventListener {
    private static final Log log = LogFactory.getLog((Class)NetworkChannel.class);
    protected String uri;
    protected BrokerContainer brokerContainer;
    protected ActiveMQConnection localConnection;
    protected ActiveMQConnection remoteConnection;
    protected ConcurrentHashMap consumerMap;
    protected String remoteUserName;
    protected String remotePassword;
    protected String remoteBrokerName;
    protected String remoteClusterName;
    protected int maximumRetries = 0;
    protected long reconnectSleepTime = 2000L;
    protected PooledExecutor threadPool;
    private SynchronizedBoolean started = new SynchronizedBoolean(false);
    private SynchronizedBoolean connected = new SynchronizedBoolean(false);
    private ConnectionAdvisor connectionAdvisor;

    public NetworkChannel() {
        this.consumerMap = new ConcurrentHashMap();
    }

    public NetworkChannel(PooledExecutor tp) {
        this();
        this.threadPool = tp;
    }

    public NetworkChannel(NetworkConnector connector, BrokerContainer brokerContainer, String uri) {
        this(connector.threadPool);
        this.brokerContainer = brokerContainer;
        this.uri = uri;
    }

    public NetworkChannel(NetworkConnector connector, BrokerContainer brokerContainer, TransportChannel channel, String remoteBrokerName, String remoteclusterName) throws JMSException {
        this(connector.threadPool);
        this.brokerContainer = brokerContainer;
        this.uri = "";
        this.remoteBrokerName = remoteBrokerName;
        this.remoteClusterName = remoteclusterName;
        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory();
        fac.setJ2EEcompliant(false);
        fac.setTurboBoost(true);
        this.remoteConnection = new ActiveMQConnection(fac, this.remoteUserName, this.remotePassword, channel);
        this.remoteConnection.setClientID("Boondocks:" + this.remoteClusterName + ":" + remoteBrokerName);
        this.remoteConnection.setQuickClose(true);
        this.remoteConnection.start();
        BrokerInfo info = new BrokerInfo();
        info.setBrokerName(brokerContainer.getBroker().getBrokerName());
        info.setClusterName(brokerContainer.getBroker().getBrokerClusterName());
        channel.asyncSend(info);
    }

    public void statusChanged(TransportStatusEvent event) {
        if (event != null && event.getChannelStatus() == 1) {
            this.connected.set(true);
        } else {
            this.connected.set(false);
        }
    }

    public String toString() {
        return "NetworkChannel{ , uri = '" + this.uri + "' " + ", remoteBrokerName = '" + this.remoteBrokerName + "' " + " }";
    }

    public void start() {
        if (this.started.commit(false, true)) {
            try {
                this.threadPool.execute(new Runnable(){

                    public void run() {
                        try {
                            NetworkChannel.this.initialize();
                            NetworkChannel.this.brokerContainer.getBroker().addConsumerInfoListener(NetworkChannel.this);
                            NetworkChannel.this.startSubscriptions();
                            log.info((Object)("Started NetworkChannel to " + NetworkChannel.this.uri));
                        }
                        catch (JMSException jmsEx) {
                            log.error((Object)("Failed to start NetworkChannel: " + NetworkChannel.this.uri), (Throwable)jmsEx);
                        }
                    }
                });
            }
            catch (InterruptedException e) {
                log.warn((Object)"Failed to start - interuppted", (Throwable)e);
            }
        }
    }

    public void stop() throws JMSException {
        if (this.started.commit(true, false)) {
            this.consumerMap.clear();
            if (this.remoteConnection != null) {
                this.remoteConnection.close();
                this.remoteConnection = null;
            }
            if (this.localConnection != null) {
                this.localConnection.close();
                this.localConnection = null;
            }
            Iterator i = this.consumerMap.values().iterator();
            while (i.hasNext()) {
                NetworkMessageBridge consumer = (NetworkMessageBridge)i.next();
                consumer.stop();
            }
        }
    }

    public void onConsumerInfo(final BrokerClient client, final ConsumerInfo info) {
        if (this.connected.get()) {
            if (!client.isClusteredConnection() && !info.hasVisited(this.remoteBrokerName)) {
                if (info.isStarted()) {
                    this.addConsumerInfo(info);
                } else {
                    this.removeConsumerInfo(info);
                }
            }
        } else {
            try {
                this.threadPool.execute(new Runnable(){

                    public void run() {
                        if (!client.isClusteredConnection() && !info.hasVisited(NetworkChannel.this.remoteBrokerName)) {
                            if (info.isStarted()) {
                                NetworkChannel.this.addConsumerInfo(info);
                            } else {
                                NetworkChannel.this.removeConsumerInfo(info);
                            }
                        }
                    }
                });
            }
            catch (InterruptedException e) {
                log.warn((Object)("Failed to process ConsumerInfo: " + info), (Throwable)e);
            }
        }
    }

    public String getUri() {
        return this.uri;
    }

    public void setUri(String uri) {
        this.uri = uri;
    }

    public String getRemotePassword() {
        return this.remotePassword;
    }

    public void setRemotePassword(String remotePassword) {
        this.remotePassword = remotePassword;
    }

    public String getRemoteUserName() {
        return this.remoteUserName;
    }

    public void setRemoteUserName(String remoteUserName) {
        this.remoteUserName = remoteUserName;
    }

    public BrokerContainer getBrokerContainer() {
        return this.brokerContainer;
    }

    public void setBrokerContainer(BrokerContainer brokerContainer) {
        this.brokerContainer = brokerContainer;
    }

    public int getMaximumRetries() {
        return this.maximumRetries;
    }

    public void setMaximumRetries(int maximumRetries) {
        this.maximumRetries = maximumRetries;
    }

    public long getReconnectSleepTime() {
        return this.reconnectSleepTime;
    }

    public void setReconnectSleepTime(long reconnectSleepTime) {
        this.reconnectSleepTime = reconnectSleepTime;
    }

    public String getRemoteBrokerName() {
        return this.remoteBrokerName;
    }

    public void setRemoteBrokerName(String remoteBrokerName) {
        this.remoteBrokerName = remoteBrokerName;
    }

    protected PooledExecutor getThreadPool() {
        return this.threadPool;
    }

    protected void setThreadPool(PooledExecutor threadPool) {
        this.threadPool = threadPool;
    }

    public synchronized ActiveMQConnection getLocalConnection() throws JMSException {
        if (this.localConnection == null) {
            this.initializeLocal();
        }
        return this.localConnection;
    }

    public synchronized ActiveMQConnection getRemoteConnection() throws JMSException {
        if (this.remoteConnection == null) {
            this.initializeRemote();
        }
        return this.remoteConnection;
    }

    public void onEvent(ConnectionAdvisoryEvent event) {
        String localBrokerName = this.brokerContainer.getBroker().getBrokerName();
        if (!event.getInfo().isClosed()) {
            this.brokerContainer.registerRemoteClientID(event.getInfo().getClientId());
        } else {
            this.brokerContainer.deregisterRemoteClientID(event.getInfo().getClientId());
        }
    }

    private void addConsumerInfo(ConsumerInfo info) {
        this.addConsumerInfo(info.getDestination(), info.isDurableTopic());
    }

    private void addConsumerInfo(ActiveMQDestination destination, boolean durableTopic) {
        NetworkMessageBridge bridge = (NetworkMessageBridge)this.consumerMap.get((Object)destination.getPhysicalName());
        if (bridge == null) {
            bridge = this.createBridge(destination, durableTopic);
        } else if (durableTopic && !bridge.isDurableTopic()) {
            bridge.decrementReferenceCount();
            this.upgradeBridge(bridge);
        }
        bridge.incrementReferenceCount();
    }

    private void upgradeBridge(NetworkMessageBridge bridge) {
        try {
            this.remoteConnection.stop();
            bridge.upgrade();
        }
        catch (JMSException e) {
            log.warn((Object)("Could not upgrade the NetworkMessageBridge to a durable subscription for destination: " + bridge.getDestination()), (Throwable)e);
        }
        try {
            this.remoteConnection.start();
        }
        catch (JMSException e) {
            log.error((Object)"Failed to restart the NetworkMessageBridge", (Throwable)e);
        }
    }

    private NetworkMessageBridge createBridge(ActiveMQDestination destination, boolean durableTopic) {
        NetworkMessageBridge bridge = new NetworkMessageBridge();
        try {
            bridge.setDestination(destination);
            bridge.setDurableTopic(durableTopic);
            bridge.setLocalBrokerName(this.brokerContainer.getBroker().getBrokerName());
            bridge.setLocalSession(this.getLocalConnection().createSession(false, 2));
            bridge.setRemoteSession(this.getRemoteConnection().createSession(false, 2));
            this.consumerMap.put((Object)destination.getPhysicalName(), (Object)bridge);
            bridge.start();
            log.info((Object)("started NetworkMessageBridge for destination: " + destination + " -- NetworkChannel: " + this.toString()));
        }
        catch (JMSException jmsEx) {
            log.error((Object)("Failed to start NetworkMessageBridge for destination: " + destination), (Throwable)jmsEx);
        }
        return bridge;
    }

    private void removeConsumerInfo(final ConsumerInfo info) {
        final String physicalName = info.getDestination().getPhysicalName();
        final NetworkMessageBridge bridge = (NetworkMessageBridge)this.consumerMap.get((Object)physicalName);
        if (bridge != null && bridge.decrementReferenceCount() <= 0 && !bridge.isDurableTopic() && (bridge.getDestination().isTopic() || bridge.getDestination().isTemporary())) {
            Thread runner = new Thread(new Runnable(){

                public void run() {
                    bridge.stop();
                    NetworkChannel.this.consumerMap.remove((Object)physicalName);
                    log.info((Object)("stopped MetworkMessageBridge for destination: " + info.getDestination()));
                }
            });
            runner.setDaemon(true);
            runner.start();
        }
    }

    private void startSubscriptions() {
        MessageContainerManager durableTopicMCM = this.brokerContainer.getBroker().getPersistentTopicContainerManager();
        if (durableTopicMCM != null) {
            Map map = durableTopicMCM.getLocalDestinations();
            this.startSubscriptions(map, true);
        }
        Iterator i = this.brokerContainer.getBroker().getContainerManagerMap().values().iterator();
        while (i.hasNext()) {
            MessageContainerManager mcm = (MessageContainerManager)i.next();
            if (mcm == durableTopicMCM) continue;
            this.startSubscriptions(mcm.getLocalDestinations(), false);
        }
    }

    private void startSubscriptions(Map destinations, boolean durableTopic) {
        if (destinations != null) {
            Iterator i = destinations.values().iterator();
            while (i.hasNext()) {
                ActiveMQDestination dest = (ActiveMQDestination)i.next();
                this.addConsumerInfo(dest, durableTopic);
            }
        }
    }

    protected void initialize() throws JMSException {
        this.initializeRemote();
        this.initializeLocal();
    }

    private synchronized void initializeRemote() throws JMSException {
        if (this.remoteConnection == null) {
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(this.remoteUserName, this.remotePassword, this.uri);
            factory.setTurboBoost(true);
            factory.setJ2EEcompliant(false);
            factory.setQuickClose(true);
            factory.setInternalConnection(true);
            this.remoteConnection = (ActiveMQConnection)factory.createConnection();
            TransportChannel transportChannel = this.remoteConnection.getTransportChannel();
            if (transportChannel instanceof CompositeTransportChannel) {
                CompositeTransportChannel composite = (CompositeTransportChannel)transportChannel;
                composite.setMaximumRetries(this.maximumRetries);
                composite.setFailureSleepTime(this.reconnectSleepTime);
                composite.setIncrementTimeout(false);
            }
            transportChannel.addTransportStatusEventListener(this);
            this.remoteConnection.setClientID(this.brokerContainer.getBroker().getBrokerName() + "_NetworkChannel");
            this.remoteConnection.start();
            BrokerInfo info = new BrokerInfo();
            info.setBrokerName(this.brokerContainer.getBroker().getBrokerName());
            info.setClusterName(this.brokerContainer.getBroker().getBrokerClusterName());
            Receipt receipt = this.remoteConnection.syncSendRequest(info);
            if (receipt != null) {
                this.remoteBrokerName = receipt.getBrokerName();
                this.remoteClusterName = receipt.getClusterName();
            }
            this.connectionAdvisor = new ConnectionAdvisor(this.remoteConnection);
            this.connectionAdvisor.addListener(this);
            this.connectionAdvisor.start();
            this.connected.set(true);
        }
    }

    private void initializeLocal() throws JMSException {
        String brokerName = this.brokerContainer.getBroker().getBrokerName();
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://" + brokerName);
        factory.setTurboBoost(true);
        factory.setJ2EEcompliant(false);
        factory.setBrokerName(brokerName);
        factory.setQuickClose(true);
        factory.setInternalConnection(true);
        this.localConnection = (ActiveMQConnection)factory.createConnection();
        this.localConnection.start();
        BrokerInfo info = new BrokerInfo();
        info.setBrokerName(this.remoteBrokerName);
        info.setClusterName(this.remoteClusterName);
        this.localConnection.asyncSendPacket(info);
    }
}

