package org.hornetq.core.client.impl;

import io.netty.handler.codec.http.multipart.DiskFileUpload;
import java.io.File;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.HornetQInterruptedException;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.core.client.HornetQClientLogger;
import org.hornetq.core.client.HornetQClientMessageBundle;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
import org.hornetq.utils.FutureLatch;
import org.hornetq.utils.LinkedListIterator;
import org.hornetq.utils.PriorityLinkedList;
import org.hornetq.utils.PriorityLinkedListImpl;
import org.hornetq.utils.ReusableLatch;
import org.hornetq.utils.TokenBucketLimiter;

/* loaded from: input_file:org/hornetq/core/client/impl/ClientConsumerImpl.class */
public final class ClientConsumerImpl implements ClientConsumerInternal {
    private static final long CLOSE_TIMEOUT_MILLISECONDS = 10000;
    private static final int NUM_PRIORITIES = 10;
    private final ClientSessionInternal session;
    private final Channel channel;
    private final long id;
    private final SimpleString filterString;
    private final SimpleString queueName;
    private final boolean browseOnly;
    private final Executor sessionExecutor;
    private final Executor flowControlExecutor;
    private final int clientWindowSize;
    private final int ackBatchSize;
    private LargeMessageControllerImpl currentLargeMessageController;
    private ClientMessageInternal largeMessageReceived;
    private final TokenBucketLimiter rateLimiter;
    private volatile Thread receiverThread;
    private volatile Thread onMessageThread;
    private volatile MessageHandler handler;
    private volatile boolean closing;
    private volatile boolean closed;
    private volatile int creditsToSend;
    private volatile boolean failedOver;
    private volatile Exception lastException;
    private volatile int ackBytes;
    private volatile ClientMessageInternal lastAckedMessage;
    private long forceDeliveryCount;
    private final SessionQueueQueryResponseMessage queueInfo;
    private volatile boolean ackIndividually;
    private final ClassLoader contextClassLoader;
    private static final boolean isTrace = HornetQClientLogger.LOGGER.isTraceEnabled();
    public static final SimpleString FORCED_DELIVERY_MESSAGE = new SimpleString("_hornetq.forced.delivery.seq");
    private final ReusableLatch pendingFlowControl = new ReusableLatch(0);
    private final PriorityLinkedList<ClientMessageInternal> buffer = new PriorityLinkedListImpl(10);
    private final Runner runner = new Runner();
    private boolean stopped = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/hornetq/core/client/impl/ClientConsumerImpl$Runner.class */
    public class Runner implements Runnable {
        private Runner() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                ClientConsumerImpl.this.callOnMessage();
            } catch (Exception e) {
                HornetQClientLogger.LOGGER.onMessageError(e);
                ClientConsumerImpl.this.lastException = e;
            }
        }
    }

    public ClientConsumerImpl(ClientSessionInternal clientSessionInternal, long j, SimpleString simpleString, SimpleString simpleString2, boolean z, int i, int i2, TokenBucketLimiter tokenBucketLimiter, Executor executor, Executor executor2, Channel channel, SessionQueueQueryResponseMessage sessionQueueQueryResponseMessage, ClassLoader classLoader) {
        this.id = j;
        this.queueName = simpleString;
        this.filterString = simpleString2;
        this.browseOnly = z;
        this.channel = channel;
        this.session = clientSessionInternal;
        this.rateLimiter = tokenBucketLimiter;
        this.sessionExecutor = executor;
        this.clientWindowSize = i;
        this.ackBatchSize = i2;
        this.queueInfo = sessionQueueQueryResponseMessage;
        this.contextClassLoader = classLoader;
        this.flowControlExecutor = executor2;
    }

    private ClientMessage receive(long j, boolean z) throws HornetQException {
        checkClosed();
        if (this.largeMessageReceived != null) {
            this.largeMessageReceived.discardBody();
            this.largeMessageReceived = null;
        }
        if (this.rateLimiter != null) {
            this.rateLimiter.limit();
        }
        if (this.handler != null) {
            throw HornetQClientMessageBundle.BUNDLE.messageHandlerSet();
        }
        if (this.clientWindowSize == 0) {
            startSlowConsumer();
        }
        this.receiverThread = Thread.currentThread();
        boolean z2 = false;
        boolean z3 = false;
        long j2 = -1;
        long j3 = j == 0 ? Long.MAX_VALUE : j;
        while (true) {
            ClientMessageInternal clientMessageInternal = null;
            try {
                synchronized (this) {
                    while (true) {
                        if (!this.stopped) {
                            ClientMessageInternal poll = this.buffer.poll();
                            clientMessageInternal = poll;
                            if (poll != null) {
                                break;
                            }
                        }
                        if (this.closed || j3 <= 0) {
                            break;
                        }
                        if (j2 == -1) {
                            j2 = System.currentTimeMillis();
                        }
                        if (clientMessageInternal == null && z) {
                            if (!this.stopped) {
                                if (!z2) {
                                    z3 = true;
                                    break;
                                }
                            } else {
                                break;
                            }
                        }
                        try {
                            wait(j3);
                            if (clientMessageInternal != null || this.closed) {
                                break;
                            }
                            long currentTimeMillis = System.currentTimeMillis();
                            j3 -= currentTimeMillis - j2;
                            j2 = currentTimeMillis;
                        } catch (InterruptedException e) {
                            throw new HornetQInterruptedException(e);
                        }
                    }
                }
                if (this.failedOver) {
                    if (clientMessageInternal == null) {
                        this.failedOver = false;
                        z2 = false;
                        j3 = j == 0 ? Long.MAX_VALUE : j;
                    } else {
                        this.failedOver = false;
                    }
                }
                if (z3) {
                    if (isTrace) {
                        HornetQClientLogger.LOGGER.trace("Forcing delivery");
                    }
                    ClientSessionInternal clientSessionInternal = this.session;
                    long j4 = this.id;
                    long j5 = this.forceDeliveryCount;
                    this.forceDeliveryCount = j5 + 1;
                    clientSessionInternal.forceDelivery(j4, j5);
                    z3 = false;
                    z2 = true;
                } else {
                    if (clientMessageInternal == null) {
                        if (isTrace) {
                            HornetQClientLogger.LOGGER.trace("Returning null");
                        }
                        resetIfSlowConsumer();
                        this.receiverThread = null;
                        return null;
                    }
                    this.session.workDone();
                    if (clientMessageInternal.containsProperty(FORCED_DELIVERY_MESSAGE)) {
                        long longValue = clientMessageInternal.getLongProperty(FORCED_DELIVERY_MESSAGE).longValue();
                        if (z && z2 && longValue == this.forceDeliveryCount - 1) {
                            resetIfSlowConsumer();
                            if (isTrace) {
                                HornetQClientLogger.LOGGER.trace("There was nothing on the queue, leaving it now:: returning null");
                            }
                            return null;
                        }
                        if (isTrace) {
                            HornetQClientLogger.LOGGER.trace("Ignored force delivery answer as it belonged to another call");
                        }
                    } else {
                        boolean isExpired = clientMessageInternal.isExpired();
                        flowControlBeforeConsumption(clientMessageInternal);
                        if (!isExpired) {
                            if (clientMessageInternal.isLargeMessage()) {
                                this.largeMessageReceived = clientMessageInternal;
                            }
                            if (isTrace) {
                                HornetQClientLogger.LOGGER.trace("Returning " + clientMessageInternal);
                            }
                            ClientMessageInternal clientMessageInternal2 = clientMessageInternal;
                            this.receiverThread = null;
                            return clientMessageInternal2;
                        }
                        clientMessageInternal.discardBody();
                        this.session.expire(this.id, clientMessageInternal.getMessageID());
                        if (this.clientWindowSize == 0) {
                            startSlowConsumer();
                        }
                        if (j3 <= 0) {
                            this.receiverThread = null;
                            return null;
                        }
                    }
                }
            } finally {
                this.receiverThread = null;
            }
        }
    }

    @Override // org.hornetq.api.core.client.ClientConsumer
    public ClientMessage receive(long j) throws HornetQException {
        ClientMessage receive = receive(j, false);
        if (receive == null && !this.closed) {
            receive = receive(0L, true);
        }
        return receive;
    }

    @Override // org.hornetq.api.core.client.ClientConsumer
    public ClientMessage receive() throws HornetQException {
        return receive(0L, false);
    }

    @Override // org.hornetq.api.core.client.ClientConsumer
    public ClientMessage receiveImmediate() throws HornetQException {
        return receive(0L, true);
    }

    @Override // org.hornetq.api.core.client.ClientConsumer
    public MessageHandler getMessageHandler() throws HornetQException {
        checkClosed();
        return this.handler;
    }

    @Override // org.hornetq.api.core.client.ClientConsumer
    public synchronized void setMessageHandler(MessageHandler messageHandler) throws HornetQException {
        checkClosed();
        if (this.receiverThread != null) {
            throw HornetQClientMessageBundle.BUNDLE.inReceive();
        }
        boolean z = this.handler == null;
        if (this.handler != messageHandler && this.clientWindowSize == 0) {
            startSlowConsumer();
        }
        this.handler = messageHandler;
        if (this.handler != null && z) {
            requeueExecutors();
        } else {
            if (this.handler != null || z) {
                return;
            }
            waitForOnMessageToComplete(true);
        }
    }

    @Override // org.hornetq.api.core.client.ClientConsumer, java.lang.AutoCloseable
    public void close() throws HornetQException {
        doCleanUp(true);
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public Thread prepareForClose(final FutureLatch futureLatch) throws HornetQException {
        this.closing = true;
        resetLargeMessageController();
        this.sessionExecutor.execute(new Runnable() { // from class: org.hornetq.core.client.impl.ClientConsumerImpl.1
            @Override // java.lang.Runnable
            public void run() {
                futureLatch.run();
            }
        });
        return this.onMessageThread;
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public void cleanUp() {
        try {
            doCleanUp(false);
        } catch (HornetQException e) {
            HornetQClientLogger.LOGGER.warn("problem cleaning up: " + this);
        }
    }

    @Override // org.hornetq.api.core.client.ClientConsumer
    public boolean isClosed() {
        return this.closed;
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public void stop(boolean z) throws HornetQException {
        waitForOnMessageToComplete(z);
        if (this.browseOnly) {
            return;
        }
        synchronized (this) {
            if (this.stopped) {
                return;
            }
            this.stopped = true;
        }
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public void clearAtFailover() {
        clearBuffer();
        this.stopped = true;
        resetLargeMessageController();
        this.lastAckedMessage = null;
        this.creditsToSend = 0;
        this.failedOver = true;
        this.ackIndividually = false;
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public synchronized void start() {
        this.stopped = false;
        requeueExecutors();
    }

    @Override // org.hornetq.api.core.client.ClientConsumer
    public Exception getLastException() {
        return this.lastException;
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public SessionQueueQueryResponseMessage getQueueInfo() {
        return this.queueInfo;
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public long getID() {
        return this.id;
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public SimpleString getFilterString() {
        return this.filterString;
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public SimpleString getQueueName() {
        return this.queueName;
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public boolean isBrowseOnly() {
        return this.browseOnly;
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public synchronized void handleMessage(SessionReceiveMessage sessionReceiveMessage) throws Exception {
        if (this.closing) {
            return;
        }
        if (sessionReceiveMessage.getMessage().getBooleanProperty(Message.HDR_LARGE_COMPRESSED).booleanValue()) {
            handleCompressedMessage(sessionReceiveMessage);
        } else {
            handleRegularMessage((ClientMessageInternal) sessionReceiveMessage.getMessage(), sessionReceiveMessage);
        }
    }

    private void handleRegularMessage(ClientMessageInternal clientMessageInternal, SessionReceiveMessage sessionReceiveMessage) throws Exception {
        clientMessageInternal.setDeliveryCount(sessionReceiveMessage.getDeliveryCount());
        clientMessageInternal.setFlowControlSize(sessionReceiveMessage.getPacketSize());
        handleRegularMessage(clientMessageInternal);
    }

    private void handleRegularMessage(ClientMessageInternal clientMessageInternal) {
        if (clientMessageInternal.getAddress() == null) {
            clientMessageInternal.setAddressTransient(this.queueInfo.getAddress());
        }
        clientMessageInternal.onReceipt(this);
        if (!this.ackIndividually && clientMessageInternal.getPriority() != 4 && !clientMessageInternal.containsProperty(FORCED_DELIVERY_MESSAGE)) {
            this.ackIndividually = true;
        }
        this.buffer.addTail(clientMessageInternal, clientMessageInternal.getPriority());
        if (this.handler == null) {
            notify();
        } else {
            if (this.stopped) {
                return;
            }
            queueExecutor();
        }
    }

    private void handleCompressedMessage(SessionReceiveMessage sessionReceiveMessage) throws Exception {
        ClientMessageImpl clientMessageImpl = (ClientMessageImpl) sessionReceiveMessage.getMessage();
        ClientLargeMessageImpl clientLargeMessageImpl = new ClientLargeMessageImpl();
        clientLargeMessageImpl.retrieveExistingData(clientMessageImpl);
        File file = null;
        if (this.session.isCacheLargeMessageClient()) {
            file = File.createTempFile("tmp-large-message-" + clientLargeMessageImpl.getMessageID() + "-", DiskFileUpload.postfix);
            file.deleteOnExit();
        }
        this.currentLargeMessageController = new LargeMessageControllerImpl(this, clientLargeMessageImpl.getLargeMessageSize(), this.session.getSessionFactory().getServerLocator().getCallTimeout(), file);
        this.currentLargeMessageController.setLocal(true);
        HornetQBuffer bodyBuffer = clientMessageImpl.getBodyBuffer();
        byte[] array = bodyBuffer.readBytes(bodyBuffer.writerIndex() - bodyBuffer.readerIndex()).toByteBuffer().array();
        clientLargeMessageImpl.setLargeMessageController(new CompressedLargeMessageControllerImpl(this.currentLargeMessageController));
        this.currentLargeMessageController.addPacket(new SessionReceiveContinuationMessage(getID(), array, false, false, array.length));
        handleRegularMessage(clientLargeMessageImpl, sessionReceiveMessage);
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public synchronized void handleLargeMessage(SessionReceiveLargeMessage sessionReceiveLargeMessage) throws Exception {
        if (this.closing) {
            return;
        }
        ClientLargeMessageInternal clientLargeMessageInternal = (ClientLargeMessageInternal) sessionReceiveLargeMessage.getLargeMessage();
        clientLargeMessageInternal.setFlowControlSize(sessionReceiveLargeMessage.getPacketSize());
        clientLargeMessageInternal.setDeliveryCount(sessionReceiveLargeMessage.getDeliveryCount());
        File file = null;
        if (this.session.isCacheLargeMessageClient()) {
            file = File.createTempFile("tmp-large-message-" + clientLargeMessageInternal.getMessageID() + "-", DiskFileUpload.postfix);
            file.deleteOnExit();
        }
        this.currentLargeMessageController = new LargeMessageControllerImpl(this, sessionReceiveLargeMessage.getLargeMessageSize(), this.session.getSessionFactory().getServerLocator().getCallTimeout(), file);
        if (clientLargeMessageInternal.isCompressed()) {
            clientLargeMessageInternal.setLargeMessageController(new CompressedLargeMessageControllerImpl(this.currentLargeMessageController));
        } else {
            clientLargeMessageInternal.setLargeMessageController(this.currentLargeMessageController);
        }
        handleRegularMessage(clientLargeMessageInternal);
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public synchronized void handleLargeMessageContinuation(SessionReceiveContinuationMessage sessionReceiveContinuationMessage) throws Exception {
        if (this.closing) {
            return;
        }
        if (this.currentLargeMessageController != null) {
            this.currentLargeMessageController.addPacket(sessionReceiveContinuationMessage);
            return;
        }
        if (isTrace) {
            HornetQClientLogger.LOGGER.trace("Sending back credits for largeController = null " + sessionReceiveContinuationMessage.getPacketSize());
        }
        flowControl(sessionReceiveContinuationMessage.getPacketSize(), false);
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public void clear(boolean z) throws HornetQException {
        synchronized (this) {
            LinkedListIterator<ClientMessageInternal> it = this.buffer.iterator();
            while (it.hasNext()) {
                try {
                    ClientMessageInternal next = it.next();
                    if (next.isLargeMessage()) {
                        ((ClientLargeMessageInternal) next).getLargeMessageController().cancel();
                    }
                    flowControlBeforeConsumption(next);
                } catch (Exception e) {
                    HornetQClientLogger.LOGGER.errorClearingMessages(e);
                }
            }
            clearBuffer();
            try {
                resetLargeMessageController();
            } catch (Throwable th) {
                HornetQClientLogger.LOGGER.errorClearingMessages(th);
            }
        }
        waitForOnMessageToComplete(z);
    }

    private void resetLargeMessageController() {
        LargeMessageControllerImpl largeMessageControllerImpl = this.currentLargeMessageController;
        if (largeMessageControllerImpl != null) {
            largeMessageControllerImpl.cancel();
            this.currentLargeMessageController = null;
        }
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public int getClientWindowSize() {
        return this.clientWindowSize;
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public int getBufferSize() {
        return this.buffer.size();
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public void acknowledge(ClientMessage clientMessage) throws HornetQException {
        ClientMessageInternal clientMessageInternal = (ClientMessageInternal) clientMessage;
        if (this.ackIndividually) {
            individualAcknowledge(clientMessage);
            return;
        }
        this.ackBytes += clientMessage.getEncodeSize();
        if (this.ackBytes >= this.ackBatchSize) {
            doAck(clientMessageInternal);
        } else {
            this.lastAckedMessage = clientMessageInternal;
        }
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public void individualAcknowledge(ClientMessage clientMessage) throws HornetQException {
        if (this.lastAckedMessage != null) {
            flushAcks();
        }
        this.session.individualAcknowledge(this.id, clientMessage.getMessageID());
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public void flushAcks() throws HornetQException {
        if (this.lastAckedMessage != null) {
            doAck(this.lastAckedMessage);
        }
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public void flowControl(int i, boolean z) throws HornetQException {
        if (this.clientWindowSize >= 0) {
            this.creditsToSend += i;
            if (this.creditsToSend >= this.clientWindowSize) {
                if (this.clientWindowSize == 0 && z) {
                    if (isTrace) {
                        HornetQClientLogger.LOGGER.trace("FlowControl::Sending " + this.creditsToSend + " -1, for slow consumer");
                    }
                    int i2 = this.creditsToSend - 1;
                    this.creditsToSend = 0;
                    if (i2 > 0) {
                        sendCredits(i2);
                        return;
                    }
                    return;
                }
                if (HornetQClientLogger.LOGGER.isDebugEnabled()) {
                    HornetQClientLogger.LOGGER.debug("Sending " + i + " from flow-control");
                }
                int i3 = this.creditsToSend;
                this.creditsToSend = 0;
                if (i3 > 0) {
                    sendCredits(i3);
                }
            }
        }
    }

    private void startSlowConsumer() {
        if (isTrace) {
            HornetQClientLogger.LOGGER.trace("Sending 1 credit to start delivering of one message to slow consumer");
        }
        sendCredits(1);
        try {
            this.pendingFlowControl.await(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void resetIfSlowConsumer() {
        if (this.clientWindowSize == 0) {
            sendCredits(0);
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            this.flowControlExecutor.execute(new Runnable() { // from class: org.hornetq.core.client.impl.ClientConsumerImpl.2
                @Override // java.lang.Runnable
                public void run() {
                    countDownLatch.countDown();
                }
            });
            try {
                countDownLatch.await(10L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                throw new HornetQInterruptedException(e);
            }
        }
    }

    private void requeueExecutors() {
        for (int i = 0; i < this.buffer.size(); i++) {
            queueExecutor();
        }
    }

    private void queueExecutor() {
        if (isTrace) {
            HornetQClientLogger.LOGGER.trace("Adding Runner on Executor for delivery");
        }
        this.sessionExecutor.execute(this.runner);
    }

    private void sendCredits(final int i) {
        this.pendingFlowControl.countUp();
        this.flowControlExecutor.execute(new Runnable() { // from class: org.hornetq.core.client.impl.ClientConsumerImpl.3
            @Override // java.lang.Runnable
            public void run() {
                try {
                    ClientConsumerImpl.this.channel.send(new SessionConsumerFlowCreditMessage(ClientConsumerImpl.this.id, i));
                    ClientConsumerImpl.this.pendingFlowControl.countDown();
                } catch (Throwable th) {
                    ClientConsumerImpl.this.pendingFlowControl.countDown();
                    throw th;
                }
            }
        });
    }

    private void waitForOnMessageToComplete(boolean z) {
        if (this.handler == null || !z || Thread.currentThread() == this.onMessageThread) {
            return;
        }
        FutureLatch futureLatch = new FutureLatch();
        this.sessionExecutor.execute(futureLatch);
        if (futureLatch.await(10000L)) {
            return;
        }
        HornetQClientLogger.LOGGER.timeOutWaitingForProcessing();
    }

    private void checkClosed() throws HornetQException {
        if (this.closed) {
            throw HornetQClientMessageBundle.BUNDLE.consumerClosed();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void callOnMessage() throws Exception {
        ClientMessageInternal poll;
        if (this.closing || this.stopped) {
            return;
        }
        this.session.workDone();
        MessageHandler messageHandler = this.handler;
        if (messageHandler != null) {
            if (this.rateLimiter != null) {
                this.rateLimiter.limit();
            }
            this.failedOver = false;
            synchronized (this) {
                poll = this.buffer.poll();
            }
            if (poll == null || poll.containsProperty(FORCED_DELIVERY_MESSAGE)) {
                return;
            }
            boolean isExpired = poll.isExpired();
            flowControlBeforeConsumption(poll);
            if (isExpired) {
                this.session.expire(this.id, poll.getMessageID());
            } else {
                if (isTrace) {
                    HornetQClientLogger.LOGGER.trace("Calling handler.onMessage");
                }
                final ClassLoader classLoader = (ClassLoader) AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() { // from class: org.hornetq.core.client.impl.ClientConsumerImpl.4
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.security.PrivilegedAction
                    public ClassLoader run() {
                        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
                        Thread.currentThread().setContextClassLoader(ClientConsumerImpl.this.contextClassLoader);
                        return contextClassLoader;
                    }
                });
                this.onMessageThread = Thread.currentThread();
                try {
                    messageHandler.onMessage(poll);
                    try {
                        AccessController.doPrivileged(new PrivilegedAction<Object>() { // from class: org.hornetq.core.client.impl.ClientConsumerImpl.5
                            @Override // java.security.PrivilegedAction
                            public Object run() {
                                Thread.currentThread().setContextClassLoader(classLoader);
                                return null;
                            }
                        });
                    } catch (Exception e) {
                    }
                    if (isTrace) {
                        HornetQClientLogger.LOGGER.trace("Handler.onMessage done");
                    }
                    if (poll.isLargeMessage()) {
                        poll.discardBody();
                    }
                } finally {
                    try {
                        AccessController.doPrivileged(new PrivilegedAction<Object>() { // from class: org.hornetq.core.client.impl.ClientConsumerImpl.5
                            @Override // java.security.PrivilegedAction
                            public Object run() {
                                Thread.currentThread().setContextClassLoader(classLoader);
                                return null;
                            }
                        });
                    } catch (Exception e2) {
                        HornetQClientLogger.LOGGER.warn(e2.getMessage(), e2);
                    }
                    this.onMessageThread = null;
                }
            }
            if (this.clientWindowSize == 0) {
                startSlowConsumer();
            }
        }
    }

    private void flowControlBeforeConsumption(ClientMessageInternal clientMessageInternal) throws HornetQException {
        if (clientMessageInternal.getFlowControlSize() != 0) {
            flowControl(clientMessageInternal.getFlowControlSize(), !clientMessageInternal.isLargeMessage());
        }
    }

    private void doCleanUp(boolean z) throws HornetQException {
        if (this.closed) {
            return;
        }
        this.closing = true;
        waitForOnMessageToComplete(true);
        resetLargeMessageController();
        this.closed = true;
        synchronized (this) {
            if (this.receiverThread != null) {
                notify();
            }
            this.handler = null;
            this.receiverThread = null;
        }
        flushAcks();
        clearBuffer();
        if (z) {
            this.channel.sendBlocking(new SessionConsumerCloseMessage(this.id), (byte) 21);
        }
        this.session.removeConsumer(this);
    }

    private void clearBuffer() {
        this.buffer.clear();
    }

    private void doAck(ClientMessageInternal clientMessageInternal) throws HornetQException {
        this.ackBytes = 0;
        this.lastAckedMessage = null;
        this.session.acknowledge(this.id, clientMessageInternal.getMessageID());
    }
}
