/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.requestreply;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiPredicate;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.BatchConsumerAwareMessageListener;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.GenericMessageListenerContainer;
import org.springframework.kafka.requestreply.CorrelationKey;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.requestreply.RequestReplyFuture;
import org.springframework.util.Assert;

public class AggregatingReplyingKafkaTemplate<K, V, R>
extends ReplyingKafkaTemplate<K, V, Collection<ConsumerRecord<K, R>>>
implements BatchConsumerAwareMessageListener<K, Collection<ConsumerRecord<K, R>>> {
    public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";
    public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";
    private static final int DEFAULT_COMMIT_TIMEOUT = 30;
    private final Map<CorrelationKey, Set<RecordHolder<K, R>>> pending = new HashMap<CorrelationKey, Set<RecordHolder<K, R>>>();
    private final Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
    private final BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy;
    private Duration commitTimeout = Duration.ofSeconds(30L);
    private boolean returnPartialOnTimeout;

    @Deprecated
    public AggregatingReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory, GenericMessageListenerContainer<K, Collection<ConsumerRecord<K, R>>> replyContainer, Predicate<Collection<ConsumerRecord<K, R>>> releaseStrategy) {
        this(producerFactory, replyContainer, (List<ConsumerRecord<K, R>> records, Boolean timeout) -> timeout != false || releaseStrategy.test((Collection)records));
    }

    public AggregatingReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory, GenericMessageListenerContainer<K, Collection<ConsumerRecord<K, R>>> replyContainer, BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy) {
        super(producerFactory, replyContainer);
        Assert.notNull(releaseStrategy, (String)"'releaseStrategy' cannot be null");
        ContainerProperties.AckMode ackMode = replyContainer.getContainerProperties().getAckMode();
        Assert.isTrue((ackMode.equals((Object)ContainerProperties.AckMode.MANUAL) || ackMode.equals((Object)ContainerProperties.AckMode.MANUAL_IMMEDIATE) ? 1 : 0) != 0, (String)"The reply container must have a MANUAL or MANUAL_IMMEDIATE AckMode");
        this.releaseStrategy = releaseStrategy;
    }

    public void setCommitTimeout(Duration commitTimeout) {
        Assert.notNull((Object)commitTimeout, (String)"'commitTimeout' cannot be null");
        this.commitTimeout = commitTimeout;
    }

    public synchronized void setReturnPartialOnTimeout(boolean returnPartialOnTimeout) {
        this.returnPartialOnTimeout = returnPartialOnTimeout;
    }

    @Override
    public void onMessage(List<ConsumerRecord<K, Collection<ConsumerRecord<K, R>>>> data, Consumer<?, ?> consumer) {
        ArrayList completed = new ArrayList();
        data.forEach(record -> {
            Header correlation = record.headers().lastHeader("kafka_correlationId");
            if (correlation == null) {
                this.logger.error(() -> "No correlationId found in reply: " + record + " - to use request/reply semantics, the responding server must return the correlation id  in the '" + "kafka_correlationId" + "' header");
            } else {
                CorrelationKey correlationId = new CorrelationKey(correlation.value());
                AggregatingReplyingKafkaTemplate aggregatingReplyingKafkaTemplate = this;
                synchronized (aggregatingReplyingKafkaTemplate) {
                    if (this.isPending(correlationId)) {
                        List<ConsumerRecord<K, R>> list = this.addToCollection((ConsumerRecord)record, correlationId).stream().map(RecordHolder::getRecord).collect(Collectors.toList());
                        if (this.releaseStrategy.test(list, false)) {
                            ConsumerRecord done = new ConsumerRecord(AGGREGATED_RESULTS_TOPIC, 0, 0L, null, list);
                            done.headers().add((Header)new RecordHeader("kafka_correlationId", correlationId.getCorrelationId()));
                            this.pending.remove(correlationId);
                            this.checkOffsetsAndCommitIfNecessary(list, consumer);
                            completed.add(done);
                        }
                    } else {
                        this.logLateArrival(record, correlationId);
                    }
                }
            }
        });
        if (completed.size() > 0) {
            super.onMessage(completed);
        }
    }

    @Override
    protected synchronized boolean handleTimeout(CorrelationKey correlationId, RequestReplyFuture<K, V, Collection<ConsumerRecord<K, R>>> future) {
        List list;
        Set<RecordHolder<K, R>> removed = this.pending.remove(correlationId);
        if (removed != null && this.returnPartialOnTimeout && this.releaseStrategy.test(list = removed.stream().map(RecordHolder::getRecord).collect(Collectors.toList()), true)) {
            future.set(new ConsumerRecord(PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC, 0, 0L, null, list));
            return true;
        }
        return false;
    }

    private void checkOffsetsAndCommitIfNecessary(List<ConsumerRecord<K, R>> list, Consumer<?, ?> consumer) {
        list.forEach(record -> this.offsets.compute(new TopicPartition(record.topic(), record.partition()), (k, v) -> v == null ? record.offset() + 1L : Math.max(v, record.offset() + 1L)));
        if (this.pending.isEmpty() && !this.offsets.isEmpty()) {
            consumer.commitSync(this.offsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> new OffsetAndMetadata(((Long)entry.getValue()).longValue()))), this.commitTimeout);
            this.offsets.clear();
        }
    }

    private Set<RecordHolder<K, R>> addToCollection(ConsumerRecord record, CorrelationKey correlationId) {
        Set set = this.pending.computeIfAbsent(correlationId, id -> new LinkedHashSet());
        set.add(new RecordHolder(record));
        return set;
    }

    private static final class RecordHolder<K, R> {
        private final ConsumerRecord<K, R> record;

        RecordHolder(ConsumerRecord<K, R> record) {
            this.record = record;
        }

        ConsumerRecord<K, R> getRecord() {
            return this.record;
        }

        public int hashCode() {
            int prime = 31;
            int result = 1;
            result = 31 * result + this.record.topic().hashCode() + this.record.partition() + (int)this.record.offset();
            return result;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (this.getClass() != obj.getClass()) {
                return false;
            }
            RecordHolder other = (RecordHolder)obj;
            if (this.record == null) {
                if (other.record != null) {
                    return false;
                }
            } else {
                return this.record.topic().equals(other.record.topic()) && this.record.partition() == other.record.partition() && this.record.offset() == other.record.offset();
            }
            return false;
        }
    }
}

