package io.smallrye.reactive.messaging.kafka.reply;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.common.annotation.Experimental;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniCreate;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import io.smallrye.mutiny.subscription.UniEmitter;
import io.smallrye.reactive.messaging.ClientCustomizer;
import io.smallrye.reactive.messaging.EmitterConfiguration;
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
import io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler;
import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents;
import io.smallrye.reactive.messaging.kafka.KafkaConnector;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.ConfigHelper;
import io.smallrye.reactive.messaging.kafka.impl.KafkaSource;
import io.smallrye.reactive.messaging.kafka.impl.TopicPartitions;
import io.smallrye.reactive.messaging.providers.extension.MutinyEmitterImpl;
import io.smallrye.reactive.messaging.providers.helpers.CDIUtils;
import io.smallrye.reactive.messaging.providers.impl.OverrideConnectorConfig;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.inject.Instance;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;

@Experimental("Experimental API")
/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.class */
public class KafkaRequestReplyImpl<Req, Rep> extends MutinyEmitterImpl<Req> implements KafkaRequestReply<Req, Rep>, MultiSubscriber<KafkaRecord<?, Rep>> {
    private final Map<CorrelationId, PendingReplyImpl<Rep>> pendingReplies;
    private final AtomicReference<Flow.Subscription> subscription;
    private final String channel;
    private final String replyTopic;
    private final int replyPartition;
    private final Duration replyTimeout;
    private final String replyCorrelationIdHeader;
    private final String replyTopicHeader;
    private final String replyPartitionHeader;
    private final CorrelationIdHandler correlationIdHandler;
    private final ReplyFailureHandler replyFailureHandler;
    private final String autoOffsetReset;
    private final KafkaSource<Object, Rep> replySource;
    private final Set<TopicPartition> waitForPartitions;
    private final boolean gracefulShutdown;
    private final Duration initialAssignmentTimeout;
    private Function<Message<Rep>, Message<Rep>> replyConverter;

    /* loaded from: input_file:io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl$PendingReplyImpl.class */
    public static class PendingReplyImpl<Rep> implements PendingReply {
        private final RecordMetadata metadata;
        private final String replyTopic;
        private final int replyPartition;
        private final UniEmitter<Message<Rep>> emitter;

        public PendingReplyImpl(RecordMetadata recordMetadata, String str, int i, UniEmitter<Message<Rep>> uniEmitter) {
            this.replyTopic = str;
            this.replyPartition = i;
            this.metadata = recordMetadata;
            this.emitter = uniEmitter;
        }

        @Override // io.smallrye.reactive.messaging.kafka.reply.PendingReply
        public String replyTopic() {
            return this.replyTopic;
        }

        @Override // io.smallrye.reactive.messaging.kafka.reply.PendingReply
        public int replyPartition() {
            return this.replyPartition;
        }

        @Override // io.smallrye.reactive.messaging.kafka.reply.PendingReply
        public RecordMetadata recordMetadata() {
            return this.metadata;
        }

        public UniEmitter<Message<Rep>> getEmitter() {
            return this.emitter;
        }

        public String toString() {
            return "PendingReply{metadata=" + this.metadata + ", replyTopic='" + this.replyTopic + "', replyPartition=" + this.replyPartition + "}";
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public KafkaRequestReplyImpl(EmitterConfiguration emitterConfiguration, long j, Config config, Instance<Map<String, Object>> instance, Vertx vertx, KafkaCDIEvents kafkaCDIEvents, Instance<OpenTelemetry> instance2, Instance<KafkaCommitHandler.Factory> instance3, Instance<KafkaFailureHandler.Factory> instance4, Instance<ClientCustomizer<Map<String, Object>>> instance5, Instance<DeserializationFailureHandler<?>> instance6, Instance<CorrelationIdHandler> instance7, Instance<ReplyFailureHandler> instance8, Instance<KafkaConsumerRebalanceListener> instance9) {
        super(emitterConfiguration, j);
        this.pendingReplies = new ConcurrentHashMap();
        this.subscription = new AtomicReference<>();
        this.channel = emitterConfiguration.name();
        OverrideConnectorConfig overrideConnectorConfig = new OverrideConnectorConfig("mp.messaging.outgoing.", config, KafkaConnector.CONNECTOR_NAME, this.channel, "reply", Map.of("topic", overrideConnectorConfig2 -> {
            return ((String) overrideConnectorConfig2.getOriginalValue("topic", String.class).orElse(this.channel)) + "-replies";
        }, "assign-seek", overrideConnectorConfig3 -> {
            return overrideConnectorConfig3.getOriginalValue(KafkaRequestReply.REPLY_PARTITION_KEY, Integer.class).map((v0) -> {
                return String.valueOf(v0);
            }).orElse(null);
        }));
        KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration = new KafkaConnectorIncomingConfiguration(ConfigHelper.retrieveChannelConfiguration(instance, overrideConnectorConfig));
        this.replyTopic = kafkaConnectorIncomingConfiguration.getTopic().orElse(null);
        this.replyPartition = ((Integer) overrideConnectorConfig.getOptionalValue(KafkaRequestReply.REPLY_PARTITION_KEY, Integer.class).orElse(-1)).intValue();
        this.replyTimeout = Duration.ofMillis(((Integer) overrideConnectorConfig.getOptionalValue(KafkaRequestReply.REPLY_TIMEOUT_KEY, Integer.class).orElse(5000)).intValue());
        int intValue = ((Integer) overrideConnectorConfig.getOptionalValue(KafkaRequestReply.REPLY_INITIAL_ASSIGNMENT_TIMEOUT_KEY, Integer.class).orElse(Integer.valueOf((int) this.replyTimeout.toMillis()))).intValue();
        this.initialAssignmentTimeout = intValue < 0 ? null : Duration.ofMillis(intValue);
        this.autoOffsetReset = kafkaConnectorIncomingConfiguration.getAutoOffsetReset();
        this.replyCorrelationIdHeader = (String) overrideConnectorConfig.getOptionalValue(KafkaRequestReply.REPLY_CORRELATION_ID_HEADER_KEY, String.class).orElse(KafkaRequestReply.DEFAULT_REPLY_CORRELATION_ID_HEADER);
        this.replyTopicHeader = (String) overrideConnectorConfig.getOptionalValue(KafkaRequestReply.REPLY_TOPIC_HEADER_KEY, String.class).orElse(KafkaRequestReply.DEFAULT_REPLY_TOPIC_HEADER);
        this.replyPartitionHeader = (String) overrideConnectorConfig.getOptionalValue(KafkaRequestReply.REPLY_PARTITION_HEADER_KEY, String.class).orElse(KafkaRequestReply.DEFAULT_REPLY_PARTITION_HEADER);
        this.correlationIdHandler = (CorrelationIdHandler) CDIUtils.getInstanceById(instance7, (String) overrideConnectorConfig.getOptionalValue(KafkaRequestReply.REPLY_CORRELATION_ID_HANDLER_KEY, String.class).orElse(KafkaRequestReply.DEFAULT_CORRELATION_ID_HANDLER)).get();
        this.replyFailureHandler = (ReplyFailureHandler) overrideConnectorConfig.getOptionalValue(KafkaRequestReply.REPLY_FAILURE_HANDLER_KEY, String.class).map(str -> {
            return (ReplyFailureHandler) CDIUtils.getInstanceById(instance8, str, () -> {
                return null;
            });
        }).orElse(null);
        String orElseGet = kafkaConnectorIncomingConfiguration.getGroupId().orElseGet(() -> {
            return UUID.randomUUID().toString();
        });
        this.waitForPartitions = getWaitForPartitions(kafkaConnectorIncomingConfiguration);
        this.gracefulShutdown = kafkaConnectorIncomingConfiguration.getGracefulShutdown().booleanValue();
        this.replySource = new KafkaSource<>(vertx, orElseGet, kafkaConnectorIncomingConfiguration, instance2, instance3, instance4, instance9, kafkaCDIEvents, instance5, instance6, -1);
        if (kafkaConnectorIncomingConfiguration.getBatch().booleanValue()) {
            this.replySource.getBatchStream().call(incomingKafkaRecordBatch -> {
                UniCreate createFrom = Uni.createFrom();
                Objects.requireNonNull(incomingKafkaRecordBatch);
                return createFrom.completionStage(incomingKafkaRecordBatch::ack);
            }).flatMap(incomingKafkaRecordBatch2 -> {
                return Multi.createFrom().iterable(incomingKafkaRecordBatch2.getRecords());
            }).subscribe(this);
        } else {
            this.replySource.getStream().call(incomingKafkaRecord -> {
                UniCreate createFrom = Uni.createFrom();
                Objects.requireNonNull(incomingKafkaRecord);
                return createFrom.completionStage(incomingKafkaRecord::ack);
            }).subscribe(this);
        }
    }

    private Set<TopicPartition> getWaitForPartitions(KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration) {
        Set<String> topics = KafkaSource.getTopics(kafkaConnectorIncomingConfiguration);
        Map<TopicPartition, Optional<Long>> offsetSeeks = KafkaSource.getOffsetSeeks(kafkaConnectorIncomingConfiguration.getAssignSeek().orElse(null), this.channel, topics);
        return offsetSeeks.isEmpty() ? (Set) topics.stream().map(str -> {
            return TopicPartitions.getTopicPartition(str, -1);
        }).collect(Collectors.toSet()) : offsetSeeks.keySet();
    }

    public Flow.Publisher<Message<? extends Req>> getPublisher() {
        return this.publisher.plug(multi -> {
            return (this.initialAssignmentTimeout == null || !KafkaCommitHandler.Strategy.LATEST.equals(this.autoOffsetReset)) ? multi : multi.onSubscription().call(() -> {
                return waitForAssignments().ifNoItem().after(this.initialAssignmentTimeout).fail();
            });
        }).onTermination().invoke(this::complete);
    }

    @Override // io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReply
    public void complete() {
        super.complete();
        Subscriptions.cancel(this.subscription);
        if (this.gracefulShutdown) {
            for (int i = 0; !this.pendingReplies.isEmpty() && i < 10; i++) {
                grace(this.replyTimeout.dividedBy(10L));
            }
            if (!this.pendingReplies.isEmpty()) {
                KafkaLogging.log.warnf("There are still %d pending replies after the closing timeout: %s", Integer.valueOf(this.pendingReplies.size()), this.pendingReplies.keySet());
            }
        }
        this.replySource.closeQuietly();
    }

    private void grace(Duration duration) {
        try {
            Thread.sleep(duration.toMillis());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override // io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReply
    public Uni<Rep> request(Req req) {
        return request((Message) ContextAwareMessage.of(req)).map((v0) -> {
            return v0.getPayload();
        });
    }

    @Override // io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReply
    public Uni<Message<Rep>> request(Message<Req> message) {
        OutgoingKafkaRecordMetadata.OutgoingKafkaRecordMetadataBuilder outgoingKafkaRecordMetadataBuilder = (OutgoingKafkaRecordMetadata.OutgoingKafkaRecordMetadataBuilder) message.getMetadata(OutgoingKafkaRecordMetadata.class).map(outgoingKafkaRecordMetadata -> {
            return OutgoingKafkaRecordMetadata.from(outgoingKafkaRecordMetadata);
        }).orElseGet(OutgoingKafkaRecordMetadata::builder);
        CorrelationId generate = this.correlationIdHandler.generate(message);
        outgoingKafkaRecordMetadataBuilder.addHeaders(new RecordHeader[]{new RecordHeader(this.replyCorrelationIdHeader, generate.toBytes()), new RecordHeader(this.replyTopicHeader, this.replyTopic.getBytes())});
        if (this.replyPartition != -1) {
            outgoingKafkaRecordMetadataBuilder.addHeaders(new RecordHeader[]{new RecordHeader(this.replyPartitionHeader, KafkaRequestReply.replyPartitionToBytes(this.replyPartition))});
        }
        OutgoingMessageMetadata outgoingMessageMetadata = new OutgoingMessageMetadata();
        return sendMessage(message.addMetadata(outgoingKafkaRecordMetadataBuilder.build()).addMetadata(outgoingMessageMetadata)).invoke(() -> {
            this.subscription.get().request(1L);
        }).chain(r8 -> {
            return Uni.createFrom().emitter(uniEmitter -> {
                this.pendingReplies.put(generate, new PendingReplyImpl<>((RecordMetadata) outgoingMessageMetadata.getResult(), this.replyTopic, this.replyPartition, uniEmitter));
            }).ifNoItem().after(this.replyTimeout).fail();
        }).onItemOrFailure().invoke(() -> {
            this.pendingReplies.remove(generate);
        }).plug(uni -> {
            return this.replyFailureHandler != null ? uni.onItem().transformToUni(message2 -> {
                Throwable handleReply = this.replyFailureHandler.handleReply((KafkaRecord) message2);
                return handleReply != null ? Uni.createFrom().failure(handleReply) : Uni.createFrom().item(message2);
            }) : uni;
        }).plug(uni2 -> {
            return this.replyConverter != null ? uni2.map(message2 -> {
                return this.replyConverter.apply(message2);
            }) : uni2;
        });
    }

    @Override // io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReply
    public Uni<Set<TopicPartition>> waitForAssignments() {
        return this.replySource.getConsumer().runOnPollingThread(consumer -> {
            return (Set) this.waitForPartitions.stream().flatMap(topicPartition -> {
                return topicPartition.partition() == -1 ? consumer.partitionsFor(topicPartition.topic()).stream().map(partitionInfo -> {
                    return TopicPartitions.getTopicPartition(topicPartition.topic(), partitionInfo.partition());
                }) : Stream.of(topicPartition);
            }).collect(Collectors.toSet());
        }).chain(set -> {
            return waitForAssignments(set);
        });
    }

    @Override // io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReply
    public Uni<Set<TopicPartition>> waitForAssignments(Collection<TopicPartition> collection) {
        return this.replySource.getConsumer().getAssignments().repeat().whilst(set -> {
            return !set.containsAll(collection);
        }).skip().where((v0) -> {
            return v0.isEmpty();
        }).toUni();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setReplyConverter(Function<Message<Rep>, Message<Rep>> function) {
        this.replyConverter = function;
    }

    @Override // io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReply
    public Map<CorrelationId, PendingReply> getPendingReplies() {
        return new HashMap(this.pendingReplies);
    }

    @Override // io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReply
    public KafkaConsumer<?, Rep> getConsumer() {
        return this.replySource.getConsumer();
    }

    public void onSubscribe(Flow.Subscription subscription) {
        if (Subscriptions.setIfEmpty(this.subscription, subscription)) {
            subscription.request(1L);
        }
    }

    public void onItem(KafkaRecord<?, Rep> kafkaRecord) {
        Header lastHeader = kafkaRecord.getHeaders().lastHeader(this.replyCorrelationIdHeader);
        if (lastHeader != null && kafkaRecord.getHeaders().lastHeader(this.replyTopicHeader) == null) {
            CorrelationId parse = this.correlationIdHandler.parse(lastHeader.value());
            PendingReplyImpl<Rep> remove = this.pendingReplies.remove(parse);
            if (remove != null) {
                remove.getEmitter().complete(kafkaRecord);
                return;
            }
            KafkaLogging.log.requestReplyRecordIgnored(this.channel, kafkaRecord.getTopic(), parse.toString());
        }
        this.subscription.get().request(1L);
    }

    public void onFailure(Throwable th) {
        KafkaLogging.log.requestReplyConsumerFailure(this.channel, this.replyTopic, th);
    }

    public void onCompletion() {
    }
}
