package com.hazelcast.jet.impl.observer;

import com.hazelcast.client.HazelcastClientNotActiveException;
import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
import com.hazelcast.config.RingbufferConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.OperationTimeoutException;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import com.hazelcast.internal.util.UuidUtil;
import com.hazelcast.jet.Observable;
import com.hazelcast.jet.function.Observer;
import com.hazelcast.jet.impl.execution.DoneItem;
import com.hazelcast.logging.ILogger;
import com.hazelcast.ringbuffer.ReadResultSet;
import com.hazelcast.ringbuffer.Ringbuffer;
import com.hazelcast.ringbuffer.StaleSequenceException;
import com.hazelcast.ringbuffer.impl.RingbufferService;
import com.hazelcast.spi.exception.DistributedObjectDestroyedException;
import com.hazelcast.spi.impl.executionservice.ExecutionService;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/hazelcast/jet/impl/observer/ObservableImpl.class */
public class ObservableImpl<T> implements Observable<T> {
    public static final String JET_OBSERVABLE_NAME_PREFIX = "__jet.observables.";
    public static final String OWNED_OBSERVABLE = ObservableImpl.class.getName() + ".ownedObservable";
    private final ConcurrentMap<UUID, RingbufferListener<T>> listeners = new ConcurrentHashMap();
    private final String name;
    private final HazelcastInstance hzInstance;
    private final Consumer<Observable<T>> onDestroy;
    private final ILogger logger;

    /* loaded from: input_file:com/hazelcast/jet/impl/observer/ObservableImpl$RingbufferListener.class */
    private static class RingbufferListener<T> {
        private static final int BATCH_SIZE = 1000;
        private final String id;
        private final Observer<T> observer;
        private final Ringbuffer<Object> ringbuffer;
        private final ILogger logger;
        private final Executor executor;
        private long sequence;
        private volatile boolean cancelled;

        RingbufferListener(String str, UUID uuid, Observer<T> observer, HazelcastInstance hazelcastInstance, ILogger iLogger) {
            this.observer = observer;
            this.ringbuffer = hazelcastInstance.getRingbuffer(ObservableImpl.ringbufferName(str));
            this.id = uuid.toString() + "/" + this.ringbuffer.getName();
            this.executor = getExecutor(hazelcastInstance);
            this.sequence = this.ringbuffer.headSequence();
            this.logger = iLogger;
            this.logger.info("Starting message listener '" + this.id + "'");
        }

        private static Executor getExecutor(HazelcastInstance hazelcastInstance) {
            if (hazelcastInstance instanceof HazelcastInstanceImpl) {
                return ((HazelcastInstanceImpl) hazelcastInstance).node.getNodeEngine().getExecutionService().getExecutor(ExecutionService.ASYNC_EXECUTOR);
            }
            if (hazelcastInstance instanceof HazelcastClientInstanceImpl) {
                return ((HazelcastClientInstanceImpl) hazelcastInstance).getTaskScheduler();
            }
            throw new RuntimeException(String.format("Unhandled %s type: %s", HazelcastInstance.class.getSimpleName(), hazelcastInstance.getClass().getName()));
        }

        void next() {
            if (this.cancelled) {
                return;
            }
            this.ringbuffer.readManyAsync(this.sequence, 1, 1000, null).whenCompleteAsync(this::accept, this.executor);
        }

        void cancel() {
            this.cancelled = true;
        }

        private void accept(ReadResultSet<Object> readResultSet, Throwable th) {
            if (this.cancelled) {
                return;
            }
            if (th != null) {
                if (handleInternalException(th)) {
                    next();
                    return;
                } else {
                    cancel();
                    return;
                }
            }
            long nextSequenceToReadFrom = (readResultSet.getNextSequenceToReadFrom() - readResultSet.readCount()) - this.sequence;
            if (nextSequenceToReadFrom != 0) {
                this.logger.warning(String.format("Message loss of %d messages detected in listener '%s'", Long.valueOf(nextSequenceToReadFrom), this.id));
            }
            for (int i = 0; i < readResultSet.size(); i++) {
                try {
                    if (this.cancelled) {
                        return;
                    }
                    onNewMessage(readResultSet.get(i));
                } catch (Throwable th2) {
                    this.logger.warning("Terminating message listener '" + this.id + "'. Reason: Unhandled exception, message: " + th2.getMessage(), th2);
                    cancel();
                    return;
                }
            }
            this.sequence = readResultSet.getNextSequenceToReadFrom();
            next();
        }

        private void onNewMessage(Object obj) {
            try {
                if (obj instanceof WrappedThrowable) {
                    this.observer.onError(((WrappedThrowable) obj).get());
                } else if (obj instanceof DoneItem) {
                    this.observer.onComplete();
                } else {
                    this.observer.onNext(obj);
                }
            } catch (Throwable th) {
                this.logger.warning("Exception thrown while calling observer callback for listener '" + this.id + "'. Will be ignored. Reason: " + th.getMessage(), th);
            }
        }

        protected boolean handleInternalException(Throwable th) {
            if (th instanceof OperationTimeoutException) {
                return handleOperationTimeoutException();
            }
            if (th instanceof IllegalArgumentException) {
                return handleIllegalArgumentException((IllegalArgumentException) th);
            }
            if (th instanceof StaleSequenceException) {
                return handleStaleSequenceException((StaleSequenceException) th);
            }
            if (th instanceof HazelcastInstanceNotActiveException) {
                if (!this.logger.isFinestEnabled()) {
                    return false;
                }
                this.logger.finest("Terminating message listener '" + this.id + "'. Reason: HazelcastInstance is shutting down");
                return false;
            }
            if (th instanceof HazelcastClientNotActiveException) {
                if (!this.logger.isFinestEnabled()) {
                    return false;
                }
                this.logger.finest("Terminating message listener '" + this.id + "'. Reason: HazelcastClient is shutting down");
                return false;
            }
            if (!(th instanceof DistributedObjectDestroyedException)) {
                this.logger.warning("Terminating message listener '" + this.id + "'. Reason: Unhandled exception, message: " + th.getMessage(), th);
                return false;
            }
            if (!this.logger.isFinestEnabled()) {
                return false;
            }
            this.logger.finest("Terminating message listener '" + this.id + "'. Reason: Topic is destroyed");
            return false;
        }

        private boolean handleOperationTimeoutException() {
            if (!this.logger.isFinestEnabled()) {
                return true;
            }
            this.logger.finest("Message listener '" + this.id + "' timed out. Continuing from last known sequence: " + this.sequence);
            return true;
        }

        private boolean handleIllegalArgumentException(IllegalArgumentException illegalArgumentException) {
            long headSequence = this.ringbuffer.headSequence();
            if (this.logger.isFinestEnabled()) {
                this.logger.finest(String.format("Message listener '%s' requested a too large sequence: %s. Jumping from old sequence %d to sequence %d.", this.id, illegalArgumentException.getMessage(), Long.valueOf(this.sequence), Long.valueOf(headSequence)));
            }
            adjustSequence(headSequence);
            return true;
        }

        private boolean handleStaleSequenceException(StaleSequenceException staleSequenceException) {
            long headSequence = this.ringbuffer.headSequence();
            if (this.logger.isFinestEnabled()) {
                this.logger.finest("Message listener '" + this.id + "' ran into a stale sequence. Jumping from oldSequence " + this.sequence + " to sequence " + headSequence + ".");
            }
            adjustSequence(headSequence);
            return true;
        }

        private void adjustSequence(long j) {
            if (j > this.sequence) {
                this.logger.warning(String.format("Message loss of %d messages detected in listener '%s'", Long.valueOf(j - this.sequence), this.id));
            }
            this.sequence = j;
        }
    }

    public ObservableImpl(String str, HazelcastInstance hazelcastInstance, Consumer<Observable<T>> consumer, ILogger iLogger) {
        this.name = str;
        this.hzInstance = hazelcastInstance;
        this.onDestroy = consumer;
        this.logger = iLogger;
    }

    @Override // com.hazelcast.jet.Observable
    @Nonnull
    public String name() {
        return this.name;
    }

    @Override // com.hazelcast.jet.Observable
    @Nonnull
    public UUID addObserver(@Nonnull Observer<T> observer) {
        UUID newUnsecureUUID = UuidUtil.newUnsecureUUID();
        RingbufferListener<T> ringbufferListener = new RingbufferListener<>(this.name, newUnsecureUUID, observer, this.hzInstance, this.logger);
        this.listeners.put(newUnsecureUUID, ringbufferListener);
        ringbufferListener.next();
        return newUnsecureUUID;
    }

    @Override // com.hazelcast.jet.Observable
    public void removeObserver(@Nonnull UUID uuid) {
        RingbufferListener<T> remove = this.listeners.remove(uuid);
        if (remove == null) {
            throw new IllegalArgumentException(String.format("No registered observer with registration ID %s", uuid));
        }
        remove.cancel();
    }

    @Override // com.hazelcast.jet.Observable
    public Observable<T> configureCapacity(int i) {
        String ringbufferName = ringbufferName(this.name);
        if (ringbufferExists(ringbufferName)) {
            throw new IllegalStateException("Underlying buffer for observable '" + this.name + "' is already created.");
        }
        try {
            this.hzInstance.getConfig().addRingBufferConfig(new RingbufferConfig(ringbufferName).setCapacity(i));
            return this;
        } catch (Exception e) {
            throw new RuntimeException("Failed configuring capacity: " + e, e);
        }
    }

    @Override // com.hazelcast.jet.Observable
    public int getConfiguredCapacity() {
        String ringbufferName = ringbufferName(this.name);
        if (ringbufferExists(ringbufferName)) {
            return (int) this.hzInstance.getRingbuffer(ringbufferName).capacity();
        }
        throw new IllegalStateException("Underlying buffer for observable '" + this.name + "' is not yet created.");
    }

    private boolean ringbufferExists(String str) {
        return this.hzInstance.getDistributedObjects().stream().anyMatch(distributedObject -> {
            return distributedObject.getServiceName().equals(RingbufferService.SERVICE_NAME) && distributedObject.getName().equals(str);
        });
    }

    @Override // com.hazelcast.jet.Observable
    public void destroy() {
        this.listeners.keySet().forEach(this::removeObserver);
        this.hzInstance.getRingbuffer(ringbufferName(this.name)).destroy();
        this.onDestroy.accept(this);
    }

    @Nonnull
    public static String ringbufferName(String str) {
        return JET_OBSERVABLE_NAME_PREFIX + str;
    }
}
