/*
 * Decompiled with CFR 0.152.
 */
package com.swiftmq.jms.v750;

import com.swiftmq.jms.ExceptionConverter;
import com.swiftmq.jms.MessageImpl;
import com.swiftmq.jms.SwiftMQMessageConsumer;
import com.swiftmq.jms.smqp.v750.AcknowledgeMessageRequest;
import com.swiftmq.jms.smqp.v750.AsyncMessageDeliveryRequest;
import com.swiftmq.jms.smqp.v750.CloseConsumerRequest;
import com.swiftmq.jms.smqp.v750.MessageDeliveredRequest;
import com.swiftmq.jms.smqp.v750.StartConsumerRequest;
import com.swiftmq.jms.v750.CloseConsumer;
import com.swiftmq.jms.v750.Recreatable;
import com.swiftmq.jms.v750.SessionImpl;
import com.swiftmq.swiftlet.queue.MessageEntry;
import com.swiftmq.swiftlet.queue.MessageIndex;
import com.swiftmq.tools.collection.RingBuffer;
import com.swiftmq.tools.collection.RingBufferThreadsafe;
import com.swiftmq.tools.concurrent.Semaphore;
import com.swiftmq.tools.requestreply.Reply;
import com.swiftmq.tools.requestreply.Request;
import com.swiftmq.tools.requestreply.RequestRegistry;
import com.swiftmq.tools.requestreply.RequestRetryValidator;
import com.swiftmq.tools.requestreply.ValidationException;
import com.swiftmq.tools.util.IdGenerator;
import com.swiftmq.tools.util.UninterruptableWaiter;
import jakarta.jms.IllegalStateException;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MessageConsumerImpl
implements MessageConsumer,
SwiftMQMessageConsumer,
Recreatable,
RequestRetryValidator {
    final String uniqueConsumerId = IdGenerator.getInstance().nextId('/');
    final AtomicBoolean closed = new AtomicBoolean(false);
    final AtomicInteger consumerId = new AtomicInteger();
    final AtomicBoolean transacted = new AtomicBoolean(false);
    final AtomicInteger acknowledgeMode = new AtomicInteger();
    RequestRegistry requestRegistry = null;
    String messageSelector = null;
    MessageListener messageListener = null;
    SessionImpl mySession = null;
    int serverQueueConsumerId = -1;
    boolean useThreadContextCL = false;
    final AtomicBoolean cancelled = new AtomicBoolean(false);
    RingBuffer messageCache = null;
    final AtomicBoolean doAck = new AtomicBoolean(false);
    final AtomicBoolean reportDelivered = new AtomicBoolean(false);
    final AtomicBoolean recordLog = new AtomicBoolean(true);
    final AtomicBoolean receiverWaiting = new AtomicBoolean(false);
    final AtomicBoolean wasRecovered = new AtomicBoolean(false);
    final AtomicBoolean fillCachePending = new AtomicBoolean(false);
    final AtomicBoolean receiveNoWaitFirstCall = new AtomicBoolean(true);
    final AtomicBoolean consumerStarted = new AtomicBoolean(false);
    final Lock lock = new ReentrantLock();
    final Lock fillCacheLock = new ReentrantLock();
    final UninterruptableWaiter waiter = new UninterruptableWaiter(this.lock);

    public MessageConsumerImpl(boolean transacted, int acknowledgeMode, RequestRegistry requestRegistry, String messageSelector, SessionImpl session) {
        this.transacted.set(transacted);
        this.acknowledgeMode.set(acknowledgeMode);
        this.requestRegistry = requestRegistry;
        this.messageSelector = messageSelector;
        this.mySession = session;
        this.useThreadContextCL = this.mySession.getMyConnection().isUseThreadContextCL();
        this.reportDelivered.set(transacted || acknowledgeMode == 2);
        this.messageCache = new RingBufferThreadsafe(this.mySession.getMyConnection().getSmqpConsumerCacheSize());
    }

    @Override
    public Request getRecreateRequest() {
        return null;
    }

    @Override
    public void setRecreateReply(Reply reply) {
    }

    @Override
    public List getRecreatables() {
        return null;
    }

    @Override
    public void validate(Request request) throws ValidationException {
        request.setDispatchId(this.mySession.dispatchId);
        if (request instanceof CloseConsumerRequest) {
            CloseConsumerRequest r = (CloseConsumerRequest)request;
            r.setSessionDispatchId(this.mySession.dispatchId);
            r.setQueueConsumerId(this.serverQueueConsumerId);
        } else {
            request.setCancelledByValidator(true);
        }
    }

    protected void verifyState() throws JMSException {
        if (this.closed.get()) {
            throw new IllegalStateException("Message consumer is closed");
        }
        this.mySession.verifyState();
    }

    public boolean isConsumerStarted() {
        return this.consumerStarted.get();
    }

    void setWasRecovered(boolean wasRecovered) {
        this.wasRecovered.set(wasRecovered);
    }

    void setDoAck(boolean doAck) {
        this.doAck.set(doAck);
    }

    public void setRecordLog(boolean recordLog) {
        this.recordLog.set(recordLog);
    }

    void addToCache(AsyncMessageDeliveryRequest request) {
        if (this.isClosed()) {
            return;
        }
        if (request.isRequiresRestart()) {
            this.fillCachePending.set(false);
        }
        this.messageCache.add(request);
    }

    void addToCache(AsyncMessageDeliveryRequest[] requests, boolean lastRestartRequired) {
        for (int i = 0; i < requests.length; ++i) {
            if (lastRestartRequired && i == requests.length - 1) {
                requests[i].setRequiresRestart(true);
            }
            this.addToCache(requests[i]);
        }
    }

    boolean invokeConsumer() {
        boolean shouldSignal = false;
        this.lock.lock();
        try {
            if (this.messageCache.getSize() > 0) {
                if (this.messageListener == null) {
                    if (this.receiverWaiting.get()) {
                        this.receiverWaiting.set(false);
                        shouldSignal = true;
                    }
                } else {
                    this.invokeMessageListener();
                }
            }
        }
        finally {
            this.lock.unlock();
        }
        if (shouldSignal) {
            this.waiter.signal();
        }
        return this.messageCache.getSize() > 0 && (this.messageListener != null || this.receiverWaiting.get()) && !this.isClosed();
    }

    void fillCache(boolean force) {
        this.fillCacheLock.lock();
        try {
            if (this.isClosed() || this.fillCachePending.get() && !force) {
                return;
            }
            this.fillCachePending.set(true);
            this.consumerStarted.set(true);
            this.requestRegistry.request(new StartConsumerRequest(this, this.mySession.dispatchId, this.serverQueueConsumerId, this.mySession.getMyDispatchId(), this.consumerId.get(), this.mySession.getMyConnection().getSmqpConsumerCacheSize(), this.mySession.getMyConnection().getSmqpConsumerCacheSizeKB()));
        }
        finally {
            this.fillCacheLock.unlock();
        }
    }

    void fillCache() {
        this.fillCache(false);
    }

    void clearCache() {
        this.fillCachePending.set(false);
        this.messageCache.clear();
    }

    @Override
    public boolean isClosed() {
        return this.closed.get() || this.mySession.isClosed();
    }

    int getConsumerId() {
        return this.consumerId.get();
    }

    void setConsumerId(int id) {
        this.consumerId.set(id);
    }

    int getServerQueueConsumerId() {
        return this.serverQueueConsumerId;
    }

    void setServerQueueConsumerId(int id) {
        this.serverQueueConsumerId = id;
    }

    public String getMessageSelector() throws JMSException {
        this.verifyState();
        return this.messageSelector;
    }

    public MessageListener getMessageListener() throws JMSException {
        this.verifyState();
        return this.messageListener;
    }

    public void setMessageListener(MessageListener listener) throws JMSException {
        this.verifyState();
        if (listener != null && !this.consumerStarted.get()) {
            this.fillCache();
        }
        this.messageListener = listener;
        if (listener != null) {
            this.mySession.triggerInvocation();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void invokeMessageListener() {
        block22: {
            this.lock.lock();
            try {
                if (this.isClosed()) {
                    return;
                }
                AsyncMessageDeliveryRequest request = (AsyncMessageDeliveryRequest)this.messageCache.remove();
                MessageEntry messageEntry = request.getMessageEntry();
                MessageImpl msg = messageEntry.getMessage();
                messageEntry.moveMessageAttributes();
                MessageIndex msgIndex = msg.getMessageIndex();
                msg.setMessageConsumerImpl(this);
                try {
                    msg.reset();
                }
                catch (JMSException e) {
                    e.printStackTrace();
                }
                msg.setReadOnly(true);
                msg.setUseThreadContextCL(this.useThreadContextCL);
                String id = null;
                boolean duplicate = false;
                if (this.recordLog.get()) {
                    id = SessionImpl.buildId(this.uniqueConsumerId, msg);
                    boolean bl = duplicate = this.mySession.myConnection.isDuplicateMessageDetection() && this.mySession.isDuplicate(id);
                }
                if (this.reportDelivered.get()) {
                    this.reportDelivered(msg, false);
                }
                try {
                    if (!duplicate) {
                        if (this.recordLog.get() && this.mySession.myConnection.isDuplicateMessageDetection()) {
                            this.mySession.addCurrentTxLog(id);
                        }
                        this.mySession.withinOnMessage = true;
                        this.mySession.onMessageMessage = msg;
                        this.mySession.onMessageConsumer = this;
                        this.mySession.setTxCancelled(false);
                        this.messageListener.onMessage((Message)msg);
                        this.mySession.onMessageMessage = null;
                        this.mySession.onMessageConsumer = null;
                        this.mySession.withinOnMessage = false;
                        if (this.mySession.isTxCancelled() || this.mySession.acknowledgeMode == 2 && msg.isCancelled()) {
                            this.wasRecovered.set(false);
                            return;
                        }
                    }
                }
                catch (RuntimeException e) {
                    System.err.println("ERROR! MessageListener throws RuntimeException, shutting down consumer!");
                    e.printStackTrace();
                    try {
                        this.close(e.toString());
                    }
                    catch (JMSException jMSException) {
                        // empty catch block
                    }
                    return;
                }
                if (!this.wasRecovered.get()) {
                    if (request.isRequiresRestart()) {
                        this.fillCache();
                    }
                    if (!this.doAck.get()) break block22;
                    try {
                        this.acknowledgeMessage(msgIndex, false);
                        break block22;
                    }
                    catch (JMSException e) {
                        throw new RuntimeException(e);
                    }
                }
                this.wasRecovered.set(false);
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    protected void reportDelivered(Message message, boolean duplicate) {
        try {
            MessageIndex messageIndex = ((MessageImpl)message).getMessageIndex();
            this.requestRegistry.request(new MessageDeliveredRequest(this, this.mySession.dispatchId, this.serverQueueConsumerId, messageIndex, duplicate));
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Override
    public boolean acknowledgeMessage(MessageImpl message) throws JMSException {
        if (this.transacted.get()) {
            throw new IllegalStateException("acknowledge not possible, session is transacted!");
        }
        if (this.acknowledgeMode.get() != 2) {
            throw new IllegalStateException("acknowledge not possible, session was not created in mode CLIENT_ACKNOWLEDGE!");
        }
        return this.acknowledgeMessage(message.getMessageIndex(), true);
    }

    private boolean acknowledgeMessage(MessageIndex messageIndex, boolean replyRequired) throws JMSException {
        if (this.isClosed()) {
            throw new IllegalStateException("Connection is closed");
        }
        Reply reply = null;
        boolean cancelled = false;
        try {
            if (messageIndex == null) {
                throw new JMSException("Unable to acknowledge message - missing message key!");
            }
            AcknowledgeMessageRequest request = new AcknowledgeMessageRequest(this, this.mySession.dispatchId, this.serverQueueConsumerId, messageIndex);
            request.setReplyRequired(replyRequired);
            reply = this.requestRegistry.request(request);
            if (request.isCancelledByValidator()) {
                cancelled = true;
                this.mySession.addCurrentTxToDuplicateLog();
            }
            this.mySession.removeCurrentTxFromRollbackLog();
            this.mySession.clearCurrentTxLog();
        }
        catch (Exception e) {
            if (this.isClosed()) {
                throw new IllegalStateException("Connection is closed: " + String.valueOf(e));
            }
            throw ExceptionConverter.convert(e);
        }
        if (replyRequired && !reply.isOk()) {
            if (this.isClosed()) {
                throw new IllegalStateException("Connection is closed: " + String.valueOf(reply.getException()));
            }
            throw ExceptionConverter.convert(reply.getException());
        }
        return cancelled;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Message receiveMessage(boolean block, long timeout) throws JMSException {
        this.lock.lock();
        try {
            this.verifyState();
            if (this.messageListener != null) {
                throw new JMSException("receive not allowed while a message listener has been set");
            }
            boolean wasDuplicate = false;
            boolean wasInvalidConnectionId = false;
            MessageImpl msg = null;
            String id = null;
            do {
                wasDuplicate = false;
                wasInvalidConnectionId = false;
                if (!this.consumerStarted.get()) {
                    this.fillCache();
                }
                do {
                    if (this.messageCache.getSize() != 0) continue;
                    if (block) {
                        this.receiverWaiting.set(true);
                        if (timeout == 0L) {
                            this.waiter.doWait();
                        } else {
                            long startWait;
                            long delta;
                            long to = timeout;
                            do {
                                startWait = System.currentTimeMillis();
                                this.waiter.doWait(to);
                            } while ((to -= (delta = System.currentTimeMillis() - startWait)) > 0L && this.messageCache.getSize() == 0 && this.fillCachePending.get() && !this.cancelled.get() && !this.isClosed());
                        }
                    } else if (this.fillCachePending.get() && this.receiveNoWaitFirstCall.get()) {
                        this.receiverWaiting.set(true);
                        this.waiter.doWait(1000L);
                    }
                    if (!this.cancelled.get()) continue;
                    Message to = null;
                    return to;
                } while (this.mySession.resetInProgress);
                this.receiverWaiting.set(false);
                if (this.messageCache.getSize() == 0 || this.isClosed()) {
                    Message to = null;
                    return to;
                }
                AsyncMessageDeliveryRequest request = (AsyncMessageDeliveryRequest)this.messageCache.remove();
                if (request.getConnectionId() != this.mySession.myConnection.getConnectionId()) {
                    wasInvalidConnectionId = true;
                    continue;
                }
                MessageEntry messageEntry = request.getMessageEntry();
                msg = messageEntry.getMessage();
                messageEntry.moveMessageAttributes();
                msg.setMessageConsumerImpl(this);
                msg.reset();
                msg.setReadOnly(true);
                msg.setUseThreadContextCL(this.useThreadContextCL);
                if (request.isRequiresRestart()) {
                    this.fillCache();
                }
                if (this.recordLog.get()) {
                    id = SessionImpl.buildId(this.uniqueConsumerId, msg);
                    boolean bl = wasDuplicate = this.mySession.myConnection.isDuplicateMessageDetection() && this.mySession.isDuplicate(id);
                }
                if (this.reportDelivered.get()) {
                    this.reportDelivered(msg, false);
                }
                if (this.doAck.get()) {
                    try {
                        this.acknowledgeMessage(msg.getMessageIndex(), false);
                    }
                    catch (JMSException jMSException) {
                        // empty catch block
                    }
                }
                if (!wasDuplicate) continue;
                msg = null;
            } while (wasDuplicate || wasInvalidConnectionId);
            if (this.recordLog.get() && this.mySession.myConnection.isDuplicateMessageDetection()) {
                this.mySession.addCurrentTxLog(id);
            }
            MessageImpl messageImpl = msg;
            return messageImpl;
        }
        finally {
            this.lock.unlock();
        }
    }

    public Message receive() throws JMSException {
        return this.receiveMessage(true, 0L);
    }

    public Message receive(long timeOut) throws JMSException {
        return this.receiveMessage(true, timeOut);
    }

    public Message receiveNoWait() throws JMSException {
        Message msg = this.receiveMessage(false, 0L);
        this.receiveNoWaitFirstCall.set(false);
        return msg;
    }

    void close(String exception) throws JMSException {
        this.lock.lock();
        try {
            if (this.isClosed()) {
                return;
            }
            this.closed.set(true);
            this.messageCache.clear();
        }
        finally {
            this.lock.unlock();
        }
        this.waiter.signal();
        Reply reply = null;
        try {
            reply = this.requestRegistry.request(new CloseConsumerRequest(this, this.mySession.dispatchId, this.mySession.dispatchId, this.serverQueueConsumerId, exception));
        }
        catch (Exception e) {
            throw ExceptionConverter.convert(e);
        }
        if (!reply.isOk()) {
            throw ExceptionConverter.convert(reply.getException());
        }
        this.mySession.removeMessageConsumerImpl(this);
    }

    public void close() throws JMSException {
        if (this.closed.get()) {
            return;
        }
        if (!this.mySession.isSessionStarted()) {
            this.close(null);
            return;
        }
        CloseConsumer request = new CloseConsumer(this.consumerId.get());
        request._sem = new Semaphore();
        this.mySession.serviceRequest(request);
        request._sem.waitHere();
    }

    void cancel() {
        this.lock.lock();
        try {
            this.cancelled.set(true);
            this.closed.set(true);
            this.messageCache.clear();
        }
        finally {
            this.lock.unlock();
        }
        this.waiter.signal();
    }
}

