package com.solacesystems.jms;

import com.solacesystems.jcsmp.BytesXMLMessage;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPProperties;
import com.solacesystems.jms.encoding.JMSDecoder;
import com.solacesystems.jms.impl.AckHandler;
import com.solacesystems.jms.impl.ConnectionConsumerNoTransactionStrategy;
import com.solacesystems.jms.impl.ConnectionConsumerTransactionStrategy;
import com.solacesystems.jms.impl.ConnectionConsumerXATransactionStrategy;
import com.solacesystems.jms.impl.ConsumerFactory;
import com.solacesystems.jms.impl.JMSState;
import com.solacesystems.jms.impl.MessageAckHandlerImpl;
import com.solacesystems.jms.impl.SessionProperties;
import com.solacesystems.jms.impl.SessionTransactionType;
import com.solacesystems.jms.impl.SolJMSErrorMessages;
import com.solacesystems.jms.impl.Validator;
import com.solacesystems.jms.message.SolMessage;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.XAConnection;
import javax.jms.XASession;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/solacesystems/jms/SolConnectionConsumerRA.class */
public class SolConnectionConsumerRA implements SolConnectionConsumerIF {
    protected static final String Component = "ConnectionConsumer";
    static final Log log = LogFactory.getLog(SolConnectionConsumerRA.class);
    private final SolConnectionIF mConnection;
    private final SolDestinationInfo mDestInfo;
    private final ServerSessionPool mServerPool;
    private final int mMaxMessages;
    private final SessionProperties mSessionProps;
    private final AckHandler mAckHandler;
    private final ConnectionConsumerTransactionStrategy mAckTransaction;
    protected volatile JMSState mState;
    private volatile boolean sessionValidated = false;

    public SolConnectionConsumerRA(SolConnectionIF solConnectionIF, SolDestinationInfo solDestinationInfo, String str, ServerSessionPool serverSessionPool, int i, JMSState jMSState) throws JMSException {
        Validator.checkServerSessionPool(serverSessionPool);
        Validator.checkConnectionConsumerMaxMessages(i);
        this.mServerPool = serverSessionPool;
        this.mConnection = solConnectionIF;
        this.mDestInfo = solDestinationInfo;
        this.mState = jMSState;
        this.mSessionProps = constructSessionProperties(solConnectionIF, serverSessionPool);
        Validator.checkTransactedAndAckMode(this.mSessionProps.getTransactionType(), this.mSessionProps.getAcknowledgeMode(), this.mConnection.getProperties().getPropertyBean().getDirectTransport().booleanValue());
        Validator.checkTransactedAndLargeMessaging(this.mSessionProps.getTransactionType(), this.mConnection.getJCSMPProperties().getBooleanProperty(JCSMPProperties.LARGE_MESSAGING).booleanValue());
        ConsumerFactory consumerFactory = new ConsumerFactory(str, false, this.mSessionProps, solConnectionIF, null);
        this.mAckHandler = this.mSessionProps.getAckHandler();
        ((SolConnection) this.mConnection).addConnectionConsumer(this);
        ConnectionConsumerTransactionStrategy.InitProperties initProperties = new ConnectionConsumerTransactionStrategy.InitProperties();
        initProperties.withConnection(this.mConnection).withConsumerFactory(consumerFactory).withDestinationInfo(this.mDestInfo).withSessionProperties(this.mSessionProps);
        if (this.mSessionProps.getAcknowledgeMode() == 0) {
            this.mAckTransaction = new ConnectionConsumerXATransactionStrategy(initProperties);
        } else {
            this.mAckTransaction = new ConnectionConsumerNoTransactionStrategy(initProperties);
        }
        this.mMaxMessages = i;
        createConsumer();
        startWorkerThread();
        if (log.isDebugEnabled()) {
            log.debug("SolConnectionConsumerRA created. Connection: " + solConnectionIF.toString() + "   Destination: " + solDestinationInfo.toString());
        }
    }

    private SessionProperties constructSessionProperties(SolConnectionIF solConnectionIF, ServerSessionPool serverSessionPool) throws JMSException {
        boolean z = solConnectionIF instanceof XAConnection;
        int i = z ? 0 : 1;
        log.debug(String.format("Sessions in ServerSessionPool should be: XA:%s ackMode:%s", Boolean.valueOf(z), Integer.valueOf(i)));
        return new SessionProperties(solConnectionIF.getProperties(), z ? SessionTransactionType.XATransaction : SessionTransactionType.NoTransaction, i, new MessageAckHandlerImpl(i));
    }

    public ServerSessionPool getServerSessionPool() throws JMSException {
        return this.mServerPool;
    }

    @Override // com.solacesystems.jms.SolStartableIF
    public void start() throws JMSException {
        Validator.checkClosed(this.mState, Component);
        if (this.mState == JMSState.Stopped) {
            if (log.isDebugEnabled()) {
                log.debug("Starting Connection Consumer.");
            }
            this.mState = JMSState.Started;
            startConsumer();
            if (log.isDebugEnabled()) {
                log.debug("Connection consumer started.");
            }
        }
    }

    @Override // com.solacesystems.jms.SolStartableIF
    public void stop() throws JMSException {
        Validator.checkClosed(this.mState, Component);
        if (this.mState == JMSState.Started) {
            if (log.isDebugEnabled()) {
                log.debug("Stopping Connection Consumer.");
            }
            this.mState = JMSState.Stopped;
            stopConsumer();
            if (log.isDebugEnabled()) {
                log.debug("Connection Consumer stopped.");
            }
        }
    }

    private void startConsumer() throws JMSException {
        try {
            this.mAckTransaction.start();
        } catch (JMSException e) {
            deliverException(e);
            throw e;
        }
    }

    private void stopConsumer() throws JMSException {
        this.mAckTransaction.stop();
    }

    protected void createConsumer() throws JMSException {
        this.mAckTransaction.createConsumer();
        if (this.mState == JMSState.Started) {
            startConsumer();
        }
    }

    @Override // com.solacesystems.jms.SolCloseableIF
    public void close() throws JMSException {
        if (log.isDebugEnabled()) {
            log.debug("Connection Consumer closed.");
        }
        this.mState = JMSState.Closed;
        this.mAckTransaction.closeConsumer();
        this.mAckTransaction.close();
        ((SolConnection) this.mConnection).removeConnectionConsumer(this);
    }

    public String toString() {
        return String.format("SolConnectionConsumerRA(Destination:%s SubscriptionName:%s SessionProperties:%s TransactionState:%s)", this.mDestInfo.destination, this.mDestInfo.subscriptionName, this.mSessionProps, this.mAckTransaction.getTxState());
    }

    protected SolMessage createMessage(BytesXMLMessage bytesXMLMessage) throws JMSException {
        if (bytesXMLMessage == null) {
            return null;
        }
        SolMessage createJMSMessage = JMSDecoder.createJMSMessage(bytesXMLMessage);
        createJMSMessage.setAckHandler(this.mAckHandler);
        this.mAckHandler.onMessageCreate(createJMSMessage);
        return createJMSMessage;
    }

    private void startWorkerThread() {
        Thread thread = new Thread(new Runnable() { // from class: com.solacesystems.jms.SolConnectionConsumerRA.1
            @Override // java.lang.Runnable
            public void run() {
                SolConnectionConsumerRA.this.mainLoop();
            }
        });
        String uuid = UUID.randomUUID().toString();
        thread.setName("SolConnectionConsumerRAWorker-" + uuid.substring(uuid.length() - 8));
        thread.setDaemon(true);
        thread.start();
    }

    void mainLoop() {
        ArrayList arrayList = new ArrayList(this.mMaxMessages);
        while (this.mState != JMSState.Closed) {
            try {
                if (this.mAckTransaction.getTxState() == ConnectionConsumerTransactionStrategy.TransactionState.ACTIVE || this.mAckTransaction.getTxState() == ConnectionConsumerTransactionStrategy.TransactionState.NONTRANSACTED) {
                    try {
                        arrayList.clear();
                        while (arrayList.size() < this.mMaxMessages) {
                            int size = arrayList.size();
                            pollMessage(arrayList);
                            if (arrayList.size() == size) {
                                break;
                            }
                        }
                        if (arrayList.size() > 0) {
                            deliver(arrayList);
                            this.mAckTransaction.afterDelivery();
                            arrayList.clear();
                        }
                    } catch (JCSMPException e) {
                        if (this.mState != JMSState.Closed) {
                            throw Validator.createJMSException(SolJMSErrorMessages.OP_RECV_OPERATION, e);
                        }
                    }
                }
                this.mAckTransaction.afterPollLoop();
            } catch (JMSException e2) {
                if (this.mState != JMSState.Closed) {
                    log.error("Exception occurred in receive loop, stop connection consumer - " + e2.getMessage(), e2);
                    try {
                        stop();
                    } catch (JMSException e3) {
                    }
                    deliverException(e2);
                } else {
                    log.debug("Got JMSException: \"" + e2.getMessage() + "\" in main loop but JMSState was already closed - ignoring");
                }
            } catch (Exception e4) {
                deliverException(Validator.createJMSException(null, e4));
            }
        }
        if (log.isDebugEnabled()) {
            log.debug("SolConnectionConsumerRA.mainLoop() exit.");
        }
    }

    private void pollMessage(List<Message> list) throws JCSMPException, JMSException {
        BytesXMLMessage receive = list.size() == 0 ? this.mAckTransaction.getConsumer().receive(20) : this.mAckTransaction.getConsumer().receiveNoWait();
        if (receive != null) {
            list.add(createMessage(receive));
        }
    }

    private void deliver(Collection<Message> collection) throws JMSException {
        ArrayList arrayList = new ArrayList(collection);
        this.mAckTransaction.onMessage(arrayList);
        ServerSession serverSession = this.mServerPool.getServerSession();
        if (this.mState == JMSState.Closed) {
            log.debug("ConnectionConsumer got a server session in closed state");
            serverSession.start();
            throw new JMSException("Aborting deliver because ConnectionConsumer state has changed to closed.");
        }
        Session session = serverSession.getSession();
        try {
            if (!this.sessionValidated) {
                if (!(session instanceof SolSessionIF)) {
                    throw new ConfigurationException("Sessions in ServerSessionPool must be Solace JMS Session");
                }
                boolean transacted = session.getTransacted();
                int acknowledgeMode = session.getAcknowledgeMode();
                log.debug(String.format("deliverySession: transacted:%s ackMode:%s sniffSession:%s", Boolean.valueOf(transacted), Integer.valueOf(acknowledgeMode), session));
                if (!(this.mConnection instanceof XAConnection) && transacted) {
                    throw new ConfigurationException("Sessions in ServerSessionPool cannot be transacted Session or XASession if ConnectionConsumer is created from Connection");
                }
                if ((this.mConnection instanceof XAConnection) && !(session instanceof XASession)) {
                    throw new ConfigurationException("Sessions in ServerSessionPool must be XASession if ConnectionConsumer is created from XAConnection");
                }
                if (((SolSession) session).getConnection() != this.mConnection) {
                    throw new ConfigurationException("Sessions in ServerSessionPool must be created from the same Connection instance as ConnectionConsumer");
                }
                if (!transacted && acknowledgeMode == 2) {
                    throw new ConfigurationException("Sessions in ServerSessionPool cannot have client acknowledge mode");
                }
                this.sessionValidated = true;
            }
            ((SolSession) session).loadMessages(arrayList, this);
            serverSession.start();
        } catch (Throwable th) {
            serverSession.start();
            throw th;
        }
    }

    private void deliverException(JMSException jMSException) {
        ExceptionListener exceptionListener = this.mConnection.getProperties().getExceptionListener();
        log.debug(String.format("%s Delivering exception to connection exception listener (%s): %s", toString(), exceptionListener, jMSException.toString()));
        if (exceptionListener != null) {
            exceptionListener.onException(jMSException);
        }
    }

    @Override // com.solacesystems.jms.SolConnectionConsumerIF
    public void commitBatch(Collection<Message> collection) throws JMSException {
        throw new ConfigurationException("commitBatch call is unsupported by this implementation of ConnectionConsumer");
    }

    @Override // com.solacesystems.jms.SolConnectionConsumerIF
    public void rollbackBatch(Collection<Message> collection) throws JMSException {
        throw new ConfigurationException("rollbackBatch call is unsupported by this implementation of ConnectionConsumer");
    }

    public AckHandler getAckHandler() {
        return this.mAckHandler;
    }

    public ConnectionConsumerTransactionStrategy getAckTransaction() {
        return this.mAckTransaction;
    }
}
