package com.hazelcast.topic.impl.reliable;

import com.hazelcast.cluster.ClusterService;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.Message;
import com.hazelcast.instance.MemberImpl;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.serialization.SerializationService;
import com.hazelcast.ringbuffer.ReadResultSet;
import com.hazelcast.ringbuffer.Ringbuffer;
import com.hazelcast.ringbuffer.StaleSequenceException;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.exception.DistributedObjectDestroyedException;
import com.hazelcast.topic.ReliableMessageListener;

/* JADX WARN: Classes with same name are omitted:
  input_file:WEB-INF/lib/hazelcast-3.5.4.wso2v2.jar:com/hazelcast/topic/impl/reliable/ReliableMessageListenerRunner.class
 */
/* loaded from: input_file:WEB-INF/lib/hazelcast-all-3.5.4.jar:com/hazelcast/topic/impl/reliable/ReliableMessageListenerRunner.class */
class ReliableMessageListenerRunner<E> implements ExecutionCallback<ReadResultSet<ReliableTopicMessage>> {
    final ReliableMessageListener<E> listener;
    private final Ringbuffer<ReliableTopicMessage> ringbuffer;
    private final String topicName;
    private final SerializationService serializationService;
    private final ClusterService clusterService;
    private final ILogger logger;
    private final String id;
    private final ReliableTopicProxy<E> proxy;
    private long sequence;
    private volatile boolean cancelled;
    private final int batchSze;

    public ReliableMessageListenerRunner(String str, ReliableMessageListener<E> reliableMessageListener, ReliableTopicProxy<E> reliableTopicProxy) {
        this.id = str;
        this.listener = reliableMessageListener;
        this.proxy = reliableTopicProxy;
        this.ringbuffer = reliableTopicProxy.ringbuffer;
        this.topicName = reliableTopicProxy.getName();
        NodeEngine nodeEngine = reliableTopicProxy.getNodeEngine();
        this.serializationService = nodeEngine.getSerializationService();
        this.clusterService = nodeEngine.getClusterService();
        this.logger = nodeEngine.getLogger(ReliableMessageListenerRunner.class);
        this.batchSze = reliableTopicProxy.topicConfig.getReadBatchSize();
        long retrieveInitialSequence = reliableMessageListener.retrieveInitialSequence();
        this.sequence = retrieveInitialSequence == -1 ? this.ringbuffer.tailSequence() + 1 : retrieveInitialSequence;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void next() {
        if (this.cancelled) {
            return;
        }
        this.ringbuffer.readManyAsync(this.sequence, 1, this.batchSze, null).andThen(this, this.proxy.executor);
    }

    @Override // com.hazelcast.core.ExecutionCallback
    public void onResponse(ReadResultSet<ReliableTopicMessage> readResultSet) {
        for (ReliableTopicMessage reliableTopicMessage : readResultSet) {
            if (this.cancelled) {
                return;
            }
            try {
                this.listener.storeSequence(this.sequence);
                process(reliableTopicMessage);
            } catch (Throwable th) {
                if (terminate(th)) {
                    cancel();
                    return;
                }
            }
            this.sequence++;
        }
        next();
    }

    private void process(ReliableTopicMessage reliableTopicMessage) throws Throwable {
        this.proxy.localTopicStats.incrementReceives();
        this.listener.onMessage(toMessage(reliableTopicMessage));
    }

    private Message<E> toMessage(ReliableTopicMessage reliableTopicMessage) {
        MemberImpl member = this.clusterService.getMember(reliableTopicMessage.getPublisherAddress());
        return new Message<>(this.topicName, this.serializationService.toObject(reliableTopicMessage.getPayload()), reliableTopicMessage.getPublishTime(), member);
    }

    @Override // com.hazelcast.core.ExecutionCallback
    public void onFailure(Throwable th) {
        if (this.cancelled) {
            return;
        }
        if (th instanceof StaleSequenceException) {
            StaleSequenceException staleSequenceException = (StaleSequenceException) th;
            if (this.listener.isLossTolerant()) {
                if (this.logger.isFinestEnabled()) {
                    this.logger.finest("MessageListener " + this.listener + " on topic: " + this.topicName + " ran into a stale sequence. Jumping from oldSequence: " + this.sequence + " to sequence: " + staleSequenceException.getHeadSeq());
                }
                this.sequence = staleSequenceException.getHeadSeq();
                next();
                return;
            }
            this.logger.warning("Terminating MessageListener:" + this.listener + " on topic: " + this.topicName + ". Reason: The listener was too slow or the retention period of the message has been violated. head: " + staleSequenceException.getHeadSeq() + " sequence:" + this.sequence);
        } else if (th instanceof HazelcastInstanceNotActiveException) {
            if (this.logger.isFinestEnabled()) {
                this.logger.finest("Terminating MessageListener " + this.listener + " on topic: " + this.topicName + ".  Reason: HazelcastInstance is shutting down");
            }
        } else if (!(th instanceof DistributedObjectDestroyedException)) {
            this.logger.warning("Terminating MessageListener " + this.listener + " on topic: " + this.topicName + ". Reason: Unhandled exception, message: " + th.getMessage(), th);
        } else if (this.logger.isFinestEnabled()) {
            this.logger.finest("Terminating MessageListener " + this.listener + " on topic: " + this.topicName + ". Reason: Topic is destroyed");
        }
        cancel();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void cancel() {
        this.cancelled = true;
        this.proxy.runnersMap.remove(this.id);
    }

    private boolean terminate(Throwable th) {
        if (this.cancelled) {
            return true;
        }
        try {
            boolean isTerminal = this.listener.isTerminal(th);
            if (isTerminal) {
                this.logger.warning("Terminating MessageListener " + this.listener + " on topic: " + this.topicName + ". Reason: Unhandled exception, message: " + th.getMessage(), th);
            } else if (this.logger.isFinestEnabled()) {
                this.logger.finest("MessageListener " + this.listener + " on topic: " + this.topicName + " ran into an exception: message:" + th.getMessage(), th);
            }
            return isTerminal;
        } catch (Throwable th2) {
            this.logger.warning("Terminating messageListener:" + this.listener + " on topic: " + this.topicName + ". Reason: Unhandled exception while calling ReliableMessageListener.isTerminal() method", th2);
            return true;
        }
    }
}
