package io.smallrye.reactive.messaging.kafka.commit;

import io.smallrye.common.annotation.Experimental;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniSubscribe;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.commit.CheckpointStateStore;
import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.TopicPartitions;
import io.smallrye.reactive.messaging.providers.helpers.VertxContext;
import io.vertx.core.impl.NoStackTraceThrowable;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import java.lang.annotation.Annotation;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import org.eclipse.microprofile.reactive.messaging.Message;

@Experimental("Experimental API")
/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/commit/KafkaCheckpointCommit.class */
public class KafkaCheckpointCommit extends ContextHolder implements KafkaCommitHandler {
    private final Map<TopicPartition, CheckpointState<?>> checkpointStateMap;
    private volatile long timerId;
    private final int autoCommitInterval;
    private final KafkaConsumer<?, ?> consumer;
    private final CheckpointStateStore stateStore;
    private final BiConsumer<Throwable, Boolean> reportFailure;
    private final String consumerId;
    private final int unsyncedStateMaxAge;

    /* loaded from: input_file:io/smallrye/reactive/messaging/kafka/commit/KafkaCheckpointCommit$CheckpointState.class */
    public static class CheckpointState<T> {
        private final TopicPartition topicPartition;
        private final long createdTimestamp;
        private final AtomicLong received;
        private final AtomicLong processed;
        private volatile ProcessingState<T> processingState;
        private volatile OffsetPersistedAt persistedAt;

        private CheckpointState(TopicPartition topicPartition, ProcessingState<T> processingState, OffsetPersistedAt offsetPersistedAt) {
            this.topicPartition = topicPartition;
            this.createdTimestamp = System.currentTimeMillis();
            this.processingState = processingState;
            this.persistedAt = offsetPersistedAt;
            this.processed = new AtomicLong(0L);
            this.received = new AtomicLong(0L);
        }

        public CheckpointState(TopicPartition topicPartition) {
            this(topicPartition, ProcessingState.EMPTY_STATE);
        }

        public CheckpointState(TopicPartition topicPartition, ProcessingState<T> processingState) {
            this(topicPartition, ProcessingState.getOrEmpty(processingState), OffsetPersistedAt.NOT_PERSISTED);
        }

        public CheckpointState<T> withPersistedAt(OffsetPersistedAt offsetPersistedAt) {
            this.persistedAt = offsetPersistedAt;
            return this;
        }

        public synchronized ProcessingState<T> transformState(Supplier<ProcessingState<T>> supplier, Function<ProcessingState<T>, ProcessingState<T>> function) {
            ProcessingState<T> processingState = ProcessingState.isEmptyOrNull(this.processingState) ? supplier.get() : this.processingState;
            this.processingState = function.apply(processingState);
            return processingState;
        }

        public TopicPartition getTopicPartition() {
            return this.topicPartition;
        }

        public ProcessingState<T> getProcessingState() {
            return this.processingState;
        }

        public OffsetPersistedAt getPersistedAt() {
            return this.persistedAt;
        }

        public void receivedRecord() {
            this.received.incrementAndGet();
        }

        public void processedRecord() {
            this.processed.incrementAndGet();
        }

        public long getUnprocessedRecords() {
            return this.received.get() - this.processed.get();
        }

        public long millisSinceLastPersistedOffset() {
            if (this.persistedAt.notPersisted() && this.received.get() > 0) {
                return System.currentTimeMillis() - this.createdTimestamp;
            }
            if (hasUnsyncedOffset()) {
                return System.currentTimeMillis() - this.persistedAt.getPersistedAt();
            }
            return -1L;
        }

        public boolean hasUnsyncedOffset() {
            return !ProcessingState.isEmptyOrNull(this.processingState) && this.processingState.getOffset().longValue() > this.persistedAt.getOffset();
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            CheckpointState checkpointState = (CheckpointState) obj;
            return this.createdTimestamp == checkpointState.createdTimestamp && this.topicPartition.equals(checkpointState.topicPartition);
        }

        public int hashCode() {
            return Objects.hash(this.topicPartition, Long.valueOf(this.createdTimestamp));
        }

        public String toString() {
            TopicPartition topicPartition = this.topicPartition;
            long j = this.createdTimestamp;
            AtomicLong atomicLong = this.received;
            AtomicLong atomicLong2 = this.processed;
            ProcessingState<T> processingState = this.processingState;
            OffsetPersistedAt offsetPersistedAt = this.persistedAt;
            return "CheckpointState{topicPartition=" + topicPartition + ", createdTimestamp=" + j + ", received=" + topicPartition + ", processed=" + atomicLong + ", processingState=" + atomicLong2 + ", persistedAt=" + processingState + "}";
        }
    }

    @ApplicationScoped
    @Identifier(KafkaCommitHandler.Strategy.CHECKPOINT)
    /* loaded from: input_file:io/smallrye/reactive/messaging/kafka/commit/KafkaCheckpointCommit$Factory.class */
    public static class Factory implements KafkaCommitHandler.Factory {
        Instance<CheckpointStateStore.Factory> stateStoreFactory;

        @Inject
        public Factory(@Any Instance<CheckpointStateStore.Factory> instance) {
            this.stateStoreFactory = instance;
        }

        @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler.Factory
        public KafkaCommitHandler create(KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, Vertx vertx, KafkaConsumer<?, ?> kafkaConsumer, BiConsumer<Throwable, Boolean> biConsumer) {
            String str = (String) kafkaConsumer.configuration().get("group.id");
            int intValue = ((Integer) kafkaConnectorIncomingConfiguration.config().getOptionalValue("default.api.timeout.ms", Integer.class).orElse(60000)).intValue();
            int intValue2 = ((Integer) kafkaConnectorIncomingConfiguration.config().getOptionalValue("auto.commit.interval.ms", Integer.class).orElse(5000)).intValue();
            KafkaLogging.log.settingCommitInterval(str, intValue2);
            return new KafkaCheckpointCommit(vertx, kafkaConsumer, ((CheckpointStateStore.Factory) this.stateStoreFactory.select(new Annotation[]{Identifier.Literal.of(kafkaConnectorIncomingConfiguration.getCheckpointStateStore().orElseGet(() -> {
                KafkaLogging.log.checkpointDefaultStateStore();
                return FileCheckpointStateStore.STATE_STORE_NAME;
            }))}).get()).create(kafkaConnectorIncomingConfiguration, vertx, kafkaConsumer, (Class) kafkaConnectorIncomingConfiguration.getCheckpointStateType().map(str2 -> {
                try {
                    return Utils.loadClass(str2, Object.class);
                } catch (ClassNotFoundException e) {
                    KafkaLogging.log.checkpointStateTypeNotFound(kafkaConnectorIncomingConfiguration.getChannel(), str2);
                    return null;
                }
            }).orElse(null)), biConsumer, intValue2, kafkaConnectorIncomingConfiguration.getCheckpointUnsyncedStateMaxAgeMs().intValue(), intValue);
        }
    }

    /* loaded from: input_file:io/smallrye/reactive/messaging/kafka/commit/KafkaCheckpointCommit$LastStateStoredTooLongAgoException.class */
    public static class LastStateStoredTooLongAgoException extends NoStackTraceThrowable {
        public LastStateStoredTooLongAgoException(TopicPartition topicPartition, long j, long j2, long j3) {
            super(String.format("Latest processing state for topic-partition `%s` persisted %d seconds ago. At the moment latest registered local processing state is for offset %d. The last offset for which a state is successfully persisted was %d.", topicPartition, Long.valueOf(j), Long.valueOf(j2), Long.valueOf(j3)));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/smallrye/reactive/messaging/kafka/commit/KafkaCheckpointCommit$OffsetPersistedAt.class */
    public static class OffsetPersistedAt {
        private final long offset;
        private final long persistedAt;
        public static OffsetPersistedAt NOT_PERSISTED = new OffsetPersistedAt(-1, -1);

        public static OffsetPersistedAt persisted(long j) {
            return new OffsetPersistedAt(j, System.currentTimeMillis());
        }

        private OffsetPersistedAt(long j, long j2) {
            this.offset = j;
            this.persistedAt = j2;
        }

        public boolean notPersisted() {
            return NOT_PERSISTED.equals(this);
        }

        public long getOffset() {
            return this.offset;
        }

        public long getPersistedAt() {
            return this.persistedAt;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            OffsetPersistedAt offsetPersistedAt = (OffsetPersistedAt) obj;
            return this.offset == offsetPersistedAt.offset && this.persistedAt == offsetPersistedAt.persistedAt;
        }

        public int hashCode() {
            return Objects.hash(Long.valueOf(this.offset), Long.valueOf(this.persistedAt));
        }

        public String toString() {
            long j = this.offset;
            long j2 = this.persistedAt;
            return "OffsetPersistedAt{offset=" + j + ", persistedAt=" + j + "}";
        }
    }

    public KafkaCheckpointCommit(Vertx vertx, KafkaConsumer<?, ?> kafkaConsumer, CheckpointStateStore checkpointStateStore, BiConsumer<Throwable, Boolean> biConsumer, int i, int i2, int i3) {
        super(vertx, i3);
        this.checkpointStateMap = new ConcurrentHashMap();
        this.timerId = -1L;
        this.consumer = kafkaConsumer;
        this.consumerId = (String) kafkaConsumer.configuration().get("client.id");
        this.stateStore = checkpointStateStore;
        this.reportFailure = biConsumer;
        this.autoCommitInterval = i;
        this.unsyncedStateMaxAge = i2;
        if (i2 <= 0) {
            KafkaLogging.log.disableCheckpointCommitStrategyHealthCheck(this.consumerId);
        } else {
            KafkaLogging.log.setCheckpointCommitStrategyUnsyncedStateMaxAge(this.consumerId, i2);
        }
    }

    private void stopFlushAndCheckHealthTimer() {
        if (this.timerId != -1) {
            this.vertx.cancelTimer(this.timerId);
            this.timerId = -1L;
        }
    }

    private void startFlushAndCheckHealthTimer() {
        if (this.checkpointStateMap.isEmpty()) {
            return;
        }
        this.timerId = this.vertx.setTimer(this.autoCommitInterval, l -> {
            runOnContext(this::flushAndCheckHealth);
        });
    }

    private void flushAndCheckHealth() {
        persistProcessingState(this.checkpointStateMap).onItemOrFailure().invoke(() -> {
            startFlushAndCheckHealthTimer();
            checkHealth();
        }).subscribe().with(r1 -> {
        }, th -> {
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v24, types: [java.lang.Throwable, java.lang.Object, io.smallrye.reactive.messaging.kafka.commit.KafkaCheckpointCommit$LastStateStoredTooLongAgoException] */
    private void checkHealth() {
        if (this.unsyncedStateMaxAge > 0) {
            for (Map.Entry<TopicPartition, CheckpointState<?>> entry : this.checkpointStateMap.entrySet()) {
                TopicPartition key = entry.getKey();
                CheckpointState<?> value = entry.getValue();
                long millisSinceLastPersistedOffset = value.millisSinceLastPersistedOffset();
                if (millisSinceLastPersistedOffset > ((long) this.unsyncedStateMaxAge)) {
                    ?? lastStateStoredTooLongAgoException = new LastStateStoredTooLongAgoException(key, millisSinceLastPersistedOffset / 1000, value.getProcessingState().getOffset().longValue(), ((CheckpointState) value).persistedAt.getOffset());
                    KafkaLogging.log.warnf(lastStateStoredTooLongAgoException, lastStateStoredTooLongAgoException.getMessage(), new Object[0]);
                    this.reportFailure.accept(lastStateStoredTooLongAgoException, true);
                }
            }
        }
    }

    @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler
    public <K, V> Uni<IncomingKafkaRecord<K, V>> received(IncomingKafkaRecord<K, V> incomingKafkaRecord) {
        TopicPartition topicPartition = TopicPartitions.getTopicPartition(incomingKafkaRecord);
        CheckpointState<?> checkpointState = this.checkpointStateMap.get(topicPartition);
        if (checkpointState != null) {
            checkpointState.receivedRecord();
            incomingKafkaRecord.injectMetadata(new DefaultCheckpointMetadata(topicPartition, incomingKafkaRecord.getOffset(), checkpointState));
        }
        if (this.timerId < 0) {
            startFlushAndCheckHealthTimer();
        }
        return Uni.createFrom().item(incomingKafkaRecord);
    }

    @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler
    public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> incomingKafkaRecord) {
        return Uni.createFrom().completionStage(VertxContext.runOnEventLoopContext(this.context.getDelegate(), completableFuture -> {
            TopicPartition topicPartition = TopicPartitions.getTopicPartition(incomingKafkaRecord);
            CheckpointState<?> checkpointState = this.checkpointStateMap.get(topicPartition);
            DefaultCheckpointMetadata fromMessage = DefaultCheckpointMetadata.fromMessage((Message<?>) incomingKafkaRecord);
            if (fromMessage == null || !fromMessage.getCheckpointState().equals(checkpointState)) {
                KafkaLogging.log.acknowledgementFromRevokedTopicPartition(incomingKafkaRecord.getOffset(), topicPartition, this.consumerId, this.checkpointStateMap.keySet());
            } else {
                ProcessingState next = fromMessage.getNext();
                checkpointState.processedRecord();
                if (!ProcessingState.isEmptyOrNull(next) && fromMessage.isPersistOnAck()) {
                    Uni recoverWithNull = persistProcessingState(Map.of(topicPartition, checkpointState)).onFailure().recoverWithNull();
                    Objects.requireNonNull(incomingKafkaRecord);
                    UniSubscribe subscribe = recoverWithNull.emitOn(incomingKafkaRecord::runOnMessageContext).subscribe();
                    Consumer consumer = r4 -> {
                        completableFuture.complete(null);
                    };
                    Objects.requireNonNull(completableFuture);
                    subscribe.with(consumer, completableFuture::completeExceptionally);
                    return;
                }
            }
            incomingKafkaRecord.runOnMessageContext(() -> {
                completableFuture.complete(null);
            });
        }));
    }

    @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler
    public void terminate(boolean z) {
        if (z) {
            long waitForProcessing = waitForProcessing();
            if (waitForProcessing > 0) {
                KafkaLogging.log.messageStillUnprocessedAfterTimeout(waitForProcessing);
            }
        }
        removeFromState(this.checkpointStateMap.keySet()).chain(this::persistProcessingState).runSubscriptionOn(this::runOnContext).await().atMost(Duration.ofMillis(getTimeoutInMillis()));
        this.stateStore.close();
    }

    private long waitForProcessing() {
        int i = this.autoCommitInterval / 100;
        for (int i2 = 0; i2 < i; i2++) {
            long sum = this.checkpointStateMap.values().stream().map((v0) -> {
                return v0.getUnprocessedRecords();
            }).mapToLong(l -> {
                return l.longValue();
            }).sum();
            if (sum == 0) {
                return sum;
            }
            KafkaLogging.log.waitingForMessageProcessing(sum);
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        return this.checkpointStateMap.values().stream().map((v0) -> {
            return v0.getUnprocessedRecords();
        }).mapToLong(l2 -> {
            return l2.longValue();
        }).sum();
    }

    @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler
    public void partitionsAssigned(Collection<TopicPartition> collection) {
        stopFlushAndCheckHealthTimer();
        if (collection.isEmpty()) {
            return;
        }
        Map map = (Map) Uni.createFrom().deferred(() -> {
            return this.stateStore.fetchProcessingState(collection).onItem().invoke(map2 -> {
                KafkaLogging.log.checkpointPartitionsAssigned(this.consumerId, collection, map2.toString());
            }).onFailure().invoke(th -> {
                KafkaLogging.log.failedCheckpointPartitionsAssigned(this.consumerId, collection, th);
            }).invoke(map3 -> {
                Iterator it = collection.iterator();
                while (it.hasNext()) {
                    TopicPartition topicPartition = (TopicPartition) it.next();
                    this.checkpointStateMap.put(topicPartition, new CheckpointState<>(topicPartition, (ProcessingState) map3.get(topicPartition)));
                }
                startFlushAndCheckHealthTimer();
            });
        }).runSubscriptionOn(this::runOnContext).await().atMost(Duration.ofMillis(getTimeoutInMillis()));
        org.apache.kafka.clients.consumer.Consumer<?, ?> unwrap = this.consumer.unwrap();
        for (Map.Entry entry : map.entrySet()) {
            ProcessingState processingState = (ProcessingState) entry.getValue();
            unwrap.seek((TopicPartition) entry.getKey(), processingState != null ? processingState.getOffset().longValue() : 0L);
        }
    }

    private Uni<Map<TopicPartition, CheckpointState<?>>> removeFromState(Collection<TopicPartition> collection) {
        return Uni.createFrom().emitter(uniEmitter -> {
            stopFlushAndCheckHealthTimer();
            HashMap hashMap = new HashMap(this.checkpointStateMap);
            this.checkpointStateMap.keySet().removeAll(collection);
            hashMap.keySet().removeAll(collection);
            uniEmitter.complete(hashMap);
        });
    }

    @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler
    public void partitionsRevoked(Collection<TopicPartition> collection) {
        removeFromState(collection).invoke(map -> {
            KafkaLogging.log.checkpointPartitionsRevoked(this.consumerId, collection, map.toString());
        }).chain(this::persistProcessingState).invoke(this::startFlushAndCheckHealthTimer).runSubscriptionOn(this::runOnContext).await().atMost(Duration.ofMillis(getTimeoutInMillis()));
    }

    Uni<Void> persistProcessingState(Map<TopicPartition, CheckpointState<?>> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, CheckpointState<?>> entry : map.entrySet()) {
            CheckpointState<?> value = entry.getValue();
            if (value.hasUnsyncedOffset()) {
                hashMap.put(entry.getKey(), value.getProcessingState());
            }
        }
        return hashMap.isEmpty() ? Uni.createFrom().voidItem() : this.stateStore.persistProcessingState(hashMap).onItem().invoke(() -> {
            hashMap.forEach((topicPartition, processingState) -> {
                this.checkpointStateMap.computeIfPresent(topicPartition, (topicPartition, checkpointState) -> {
                    return checkpointState.withPersistedAt(OffsetPersistedAt.persisted(processingState.getOffset().longValue()));
                });
            });
        }).onItem().invoke(() -> {
            KafkaLogging.log.checkpointPersistedState(this.consumerId, this.checkpointStateMap.toString());
        }).onFailure().invoke(th -> {
            KafkaLogging.log.checkpointFailedPersistingState(this.consumerId, this.checkpointStateMap.toString(), th);
        });
    }
}
