/*
 * Decompiled with CFR 0.152.
 */
package org.objectweb.joram.client.jms;

import fr.dyade.aaa.common.Daemon;
import fr.dyade.aaa.common.Debug;
import fr.dyade.aaa.common.Queue;
import javax.jms.ConnectionConsumer;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
import org.objectweb.joram.client.jms.Connection;
import org.objectweb.joram.client.jms.Message;
import org.objectweb.joram.client.jms.MessageConsumerListener;
import org.objectweb.joram.client.jms.Session;
import org.objectweb.joram.client.jms.XASession;
import org.objectweb.joram.client.jms.connection.RequestMultiplexer;
import org.objectweb.joram.shared.client.ConsumerMessages;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;

public final class MultiSessionConsumer
extends MessageConsumerListener
implements ConnectionConsumer {
    private static final Logger logger = Debug.getLogger((String)MultiSessionConsumer.class.getName());
    private ServerSessionPool serverSessionPool;
    private Connection cnx;
    private int maxMsgs;
    private Queue repliesIn;
    private int nbActivatedListeners;
    private MessageDispatcher msgDispatcher;

    MultiSessionConsumer(boolean queueMode, boolean durable, String selector, String destName, String targetName, ServerSessionPool sessionPool, int queueMessageReadMax, int topicActivationThreshold, int topicPassivationThreshold, int topicAckBufferMax, RequestMultiplexer reqMultiplexer, Connection connection, int maxMessages) {
        super(queueMode, durable, selector, destName, targetName, null, queueMessageReadMax, topicActivationThreshold, topicPassivationThreshold, topicAckBufferMax, reqMultiplexer);
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "MultiSessionConsumer.<init>(" + queueMode + ',' + durable + ',' + selector + ',' + destName + ',' + targetName + ',' + sessionPool + ',' + queueMessageReadMax + ',' + topicActivationThreshold + ',' + topicPassivationThreshold + ',' + topicAckBufferMax + ',' + reqMultiplexer + ',' + maxMessages + ')');
        }
        this.serverSessionPool = sessionPool;
        this.cnx = connection;
        this.maxMsgs = maxMessages;
        if (this.maxMsgs < 1) {
            this.maxMsgs = 1;
        }
        this.msgDispatcher = new MessageDispatcher("MessageDispatcher[" + reqMultiplexer.getDemultiplexerDaemonName() + ']');
        this.repliesIn = new Queue();
        this.msgDispatcher.setDaemon(true);
        this.msgDispatcher.start();
    }

    @Override
    public void pushMessages(ConsumerMessages cm) throws JMSException {
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "MultiSessionConsumer.pushMessages(" + cm + ')');
        }
        this.repliesIn.push((Object)cm);
    }

    public ServerSessionPool getServerSessionPool() throws JMSException {
        return this.serverSessionPool;
    }

    @Override
    public void close() throws JMSException {
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "MultiSessionConsumer.close()");
        }
        this.msgDispatcher.stop();
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "MultiSessionConsumer -> dispatcher stopped");
        }
        super.close();
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "MultiSessionConsumer -> close connection consumer");
        }
        this.cnx.closeConnectionConsumer(this);
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "MultiSessionConsumer -> connection consumer closed");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onMessage(Message msg, MessageListener listener, int ackMode) throws JMSException {
        MultiSessionConsumer multiSessionConsumer;
        if (logger.isLoggable(BasicLevel.DEBUG)) {
            logger.log(BasicLevel.DEBUG, "MultiSessionConsumer.onMessage(" + msg + ')');
        }
        try {
            multiSessionConsumer = this;
            synchronized (multiSessionConsumer) {
                if (this.getStatus() == 3) {
                    throw new IllegalStateException("Message listener closed");
                }
                if (this.nbActivatedListeners == 0) {
                    this.setStatus(2);
                }
                ++this.nbActivatedListeners;
            }
            this.activateListener(msg, listener, ackMode);
        }
        finally {
            multiSessionConsumer = this;
            synchronized (multiSessionConsumer) {
                --this.nbActivatedListeners;
                if (this.nbActivatedListeners == 0) {
                    this.setStatus(1);
                    this.notifyAll();
                }
            }
        }
    }

    @Override
    protected boolean checkSessionThread() {
        return false;
    }

    class MessageDispatcher
    extends Daemon {
        MessageDispatcher(String name) {
            super(name, logger);
        }

        protected void close() {
        }

        protected void shutdown() {
        }

        public void stop() {
            if (logger.isLoggable(BasicLevel.DEBUG)) {
                logger.log(BasicLevel.DEBUG, "MessageDispatcher.stop()");
            }
            if (this.isCurrentThread()) {
                this.finish();
            } else {
                super.stop();
            }
        }

        private Session getSession(ServerSession serverSession) throws JMSException {
            Session session = null;
            javax.jms.Session obj = serverSession.getSession();
            if (obj instanceof Session) {
                session = (Session)obj;
            } else if (obj instanceof XASession) {
                session = ((XASession)obj).sess;
            } else {
                throw new Error("Unexpected session type: " + obj);
            }
            return session;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            try {
                while (this.running) {
                    this.canStop = true;
                    ConsumerMessages cm = null;
                    try {
                        cm = (ConsumerMessages)MultiSessionConsumer.this.repliesIn.get();
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    if (!this.running) {
                        break;
                    }
                    if (cm == null || cm.getMessages().isEmpty()) continue;
                    this.canStop = false;
                    ServerSession serverSession = MultiSessionConsumer.this.serverSessionPool.getServerSession();
                    Session session = this.getSession(serverSession);
                    session.setMessageConsumerListener(MultiSessionConsumer.this);
                    int sessionMsgCounter = 0;
                    for (org.objectweb.joram.shared.messages.Message msg : cm.getMessages()) {
                        if (sessionMsgCounter >= MultiSessionConsumer.this.maxMsgs) {
                            serverSession.start();
                            serverSession = MultiSessionConsumer.this.serverSessionPool.getServerSession();
                            session = this.getSession(serverSession);
                            session.setMessageConsumerListener(MultiSessionConsumer.this);
                            sessionMsgCounter = 0;
                        }
                        session.onMessage(msg);
                        ++sessionMsgCounter;
                    }
                    serverSession.start();
                    MultiSessionConsumer.this.repliesIn.pop();
                }
            }
            catch (JMSException exc) {
                if (logger.isLoggable(BasicLevel.DEBUG)) {
                    logger.log(BasicLevel.WARN, "MultiSessionConsumer exits", (Throwable)exc);
                } else {
                    logger.log(BasicLevel.WARN, "MultiSessionConsumer exits: " + exc.getMessage());
                }
            }
            finally {
                try {
                    MultiSessionConsumer.this.close();
                }
                catch (JMSException jMSException) {}
                this.finish();
            }
        }
    }
}

