package org.apache.beam.sdk.io.kafka;

import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaIOUtils;
import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.class */
public abstract class ReadFromKafkaDoFn<K, V> extends DoFn<KafkaSourceDescriptor, KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> {
    private final Map<String, Object> offsetConsumerConfig;
    private final SerializableFunction<TopicPartition, Boolean> checkStopReadingFn;
    private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn;
    private final SerializableFunction<KafkaRecord<K, V>, Instant> extractOutputTimestampFn;
    private final SerializableFunction<Instant, WatermarkEstimator<Instant>> createWatermarkEstimatorFn;
    private final TimestampPolicyFactory<K, V> timestampPolicyFactory;
    private transient Deserializer<K> keyDeserializerInstance;
    private transient Deserializer<V> valueDeserializerInstance;
    private transient LoadingCache<TopicPartition, AverageRecordSize> avgRecordSize;

    @VisibleForTesting
    final DeserializerProvider keyDeserializerProvider;

    @VisibleForTesting
    final DeserializerProvider valueDeserializerProvider;

    @VisibleForTesting
    final Map<String, Object> consumerConfig;
    private static final Logger LOG = LoggerFactory.getLogger(ReadFromKafkaDoFn.class);
    private static final Duration KAFKA_POLL_TIMEOUT = Duration.ofSeconds(1);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn$AverageRecordSize.class */
    public static class AverageRecordSize {
        private KafkaIOUtils.MovingAvg avgRecordSize = new KafkaIOUtils.MovingAvg();
        private KafkaIOUtils.MovingAvg avgRecordGap = new KafkaIOUtils.MovingAvg();

        public void update(int i, long j) {
            this.avgRecordSize.update(i);
            this.avgRecordGap.update(j);
        }

        public double getTotalSize(double d) {
            return (this.avgRecordSize.get() * d) / (1.0d + this.avgRecordGap.get());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @DoFn.BoundedPerElement
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn$Bounded.class */
    public static class Bounded<K, V> extends ReadFromKafkaDoFn<K, V> {
        Bounded(KafkaIO.ReadSourceDescriptors readSourceDescriptors) {
            super(readSourceDescriptors);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn$KafkaLatestOffsetEstimator.class */
    public static class KafkaLatestOffsetEstimator implements GrowableOffsetRangeTracker.RangeEndEstimator {
        private final Consumer<byte[], byte[]> offsetConsumer;
        private final TopicPartition topicPartition;
        private final Supplier<Long> memoizedBacklog;

        KafkaLatestOffsetEstimator(Consumer<byte[], byte[]> consumer, TopicPartition topicPartition) {
            this.offsetConsumer = consumer;
            this.topicPartition = topicPartition;
            ConsumerSpEL.evaluateAssign(this.offsetConsumer, ImmutableList.of(this.topicPartition));
            this.memoizedBacklog = Suppliers.memoizeWithExpiration(() -> {
                ConsumerSpEL.evaluateSeek2End(consumer, topicPartition);
                return Long.valueOf(consumer.position(topicPartition));
            }, 1L, TimeUnit.SECONDS);
        }

        protected void finalize() {
            try {
                Closeables.close(this.offsetConsumer, true);
            } catch (Exception e) {
                ReadFromKafkaDoFn.LOG.warn("Failed to close offset consumer for {}", this.topicPartition);
            }
        }

        public long estimate() {
            return ((Long) this.memoizedBacklog.get()).longValue();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @DoFn.UnboundedPerElement
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn$Unbounded.class */
    public static class Unbounded<K, V> extends ReadFromKafkaDoFn<K, V> {
        Unbounded(KafkaIO.ReadSourceDescriptors readSourceDescriptors) {
            super(readSourceDescriptors);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K, V> ReadFromKafkaDoFn<K, V> create(KafkaIO.ReadSourceDescriptors readSourceDescriptors) {
        return readSourceDescriptors.isBounded() ? new Bounded(readSourceDescriptors) : new Unbounded(readSourceDescriptors);
    }

    private ReadFromKafkaDoFn(KafkaIO.ReadSourceDescriptors readSourceDescriptors) {
        this.keyDeserializerInstance = null;
        this.valueDeserializerInstance = null;
        this.consumerConfig = readSourceDescriptors.getConsumerConfig();
        this.offsetConsumerConfig = readSourceDescriptors.getOffsetConsumerConfig();
        this.keyDeserializerProvider = (DeserializerProvider) Preconditions.checkArgumentNotNull(readSourceDescriptors.getKeyDeserializerProvider());
        this.valueDeserializerProvider = (DeserializerProvider) Preconditions.checkArgumentNotNull(readSourceDescriptors.getValueDeserializerProvider());
        this.consumerFactoryFn = readSourceDescriptors.getConsumerFactoryFn();
        this.extractOutputTimestampFn = readSourceDescriptors.getExtractOutputTimestampFn();
        this.createWatermarkEstimatorFn = readSourceDescriptors.getCreateWatermarkEstimatorFn();
        this.timestampPolicyFactory = readSourceDescriptors.getTimestampPolicyFactory();
        this.checkStopReadingFn = readSourceDescriptors.getCheckStopReadingFn();
    }

    @DoFn.GetInitialRestriction
    public OffsetRange initialRestriction(@DoFn.Element KafkaSourceDescriptor kafkaSourceDescriptor) {
        Consumer consumer = (Consumer) this.consumerFactoryFn.apply(overrideBootstrapServersConfig(this.consumerConfig, kafkaSourceDescriptor));
        try {
            ConsumerSpEL.evaluateAssign(consumer, ImmutableList.of(kafkaSourceDescriptor.getTopicPartition()));
            Instant startReadTime = kafkaSourceDescriptor.getStartReadTime();
            long longValue = kafkaSourceDescriptor.getStartReadOffset() != null ? kafkaSourceDescriptor.getStartReadOffset().longValue() : startReadTime != null ? ConsumerSpEL.offsetForTime(consumer, kafkaSourceDescriptor.getTopicPartition(), startReadTime) : consumer.position(kafkaSourceDescriptor.getTopicPartition());
            long j = Long.MAX_VALUE;
            Instant stopReadTime = kafkaSourceDescriptor.getStopReadTime();
            if (kafkaSourceDescriptor.getStopReadOffset() != null) {
                j = kafkaSourceDescriptor.getStopReadOffset().longValue();
            } else if (stopReadTime != null) {
                j = ConsumerSpEL.offsetForTime(consumer, kafkaSourceDescriptor.getTopicPartition(), stopReadTime);
            }
            OffsetRange offsetRange = new OffsetRange(longValue, j);
            if (consumer != null) {
                $closeResource(null, consumer);
            }
            return offsetRange;
        } catch (Throwable th) {
            if (consumer != null) {
                $closeResource(null, consumer);
            }
            throw th;
        }
    }

    @DoFn.GetInitialWatermarkEstimatorState
    public Instant getInitialWatermarkEstimatorState(@DoFn.Timestamp Instant instant) {
        return instant;
    }

    @DoFn.NewWatermarkEstimator
    public WatermarkEstimator<Instant> newWatermarkEstimator(@DoFn.WatermarkEstimatorState Instant instant) {
        return (WatermarkEstimator) ((SerializableFunction) Preconditions.checkStateNotNull(this.createWatermarkEstimatorFn)).apply(ensureTimestampWithinBounds(instant));
    }

    @DoFn.GetSize
    public double getSize(@DoFn.Element KafkaSourceDescriptor kafkaSourceDescriptor, @DoFn.Restriction OffsetRange offsetRange) throws Exception {
        LoadingCache loadingCache = (LoadingCache) Preconditions.checkStateNotNull(this.avgRecordSize);
        double workRemaining = restrictionTracker(kafkaSourceDescriptor, offsetRange).getProgress().getWorkRemaining();
        return !loadingCache.asMap().containsKey(kafkaSourceDescriptor.getTopicPartition()) ? workRemaining : ((AverageRecordSize) loadingCache.get(kafkaSourceDescriptor.getTopicPartition())).getTotalSize(workRemaining);
    }

    @DoFn.NewTracker
    public OffsetRangeTracker restrictionTracker(@DoFn.Element KafkaSourceDescriptor kafkaSourceDescriptor, @DoFn.Restriction OffsetRange offsetRange) {
        if (offsetRange.getTo() < Long.MAX_VALUE) {
            return new OffsetRangeTracker(offsetRange);
        }
        return new GrowableOffsetRangeTracker(offsetRange.getFrom(), new KafkaLatestOffsetEstimator((Consumer) this.consumerFactoryFn.apply(KafkaIOUtils.getOffsetConsumerConfig("tracker-" + kafkaSourceDescriptor.getTopicPartition(), this.offsetConsumerConfig, overrideBootstrapServersConfig(this.consumerConfig, kafkaSourceDescriptor))), kafkaSourceDescriptor.getTopicPartition()));
    }

    @DoFn.ProcessElement
    public DoFn.ProcessContinuation processElement(@DoFn.Element KafkaSourceDescriptor kafkaSourceDescriptor, RestrictionTracker<OffsetRange, Long> restrictionTracker, WatermarkEstimator watermarkEstimator, DoFn.OutputReceiver<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> outputReceiver) {
        Instant instant;
        LoadingCache loadingCache = (LoadingCache) Preconditions.checkStateNotNull(this.avgRecordSize);
        Deserializer deserializer = (Deserializer) Preconditions.checkStateNotNull(this.keyDeserializerInstance);
        Deserializer deserializer2 = (Deserializer) Preconditions.checkStateNotNull(this.valueDeserializerInstance);
        if (this.checkStopReadingFn != null && ((Boolean) this.checkStopReadingFn.apply(kafkaSourceDescriptor.getTopicPartition())).booleanValue()) {
            restrictionTracker.tryClaim(Long.valueOf(((OffsetRange) restrictionTracker.currentRestriction()).getTo() - 1));
            return DoFn.ProcessContinuation.stop();
        }
        Map<String, Object> overrideBootstrapServersConfig = overrideBootstrapServersConfig(this.consumerConfig, kafkaSourceDescriptor);
        TimestampPolicy<K, V> timestampPolicy = null;
        if (this.timestampPolicyFactory != null) {
            timestampPolicy = this.timestampPolicyFactory.createTimestampPolicy(kafkaSourceDescriptor.getTopicPartition(), Optional.ofNullable(watermarkEstimator.currentWatermark()));
        }
        Consumer consumer = (Consumer) this.consumerFactoryFn.apply(overrideBootstrapServersConfig);
        try {
            HashSet hashSet = new HashSet();
            Iterator<V> it = consumer.listTopics().values().iterator();
            while (it.hasNext()) {
                ((List) it.next()).forEach(partitionInfo -> {
                    hashSet.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
                });
            }
            if (!hashSet.contains(kafkaSourceDescriptor.getTopicPartition())) {
                DoFn.ProcessContinuation stop = DoFn.ProcessContinuation.stop();
                if (consumer != null) {
                    $closeResource(null, consumer);
                }
                return stop;
            }
            ConsumerSpEL.evaluateAssign(consumer, ImmutableList.of(kafkaSourceDescriptor.getTopicPartition()));
            long from = ((OffsetRange) restrictionTracker.currentRestriction()).getFrom();
            long j = from;
            consumer.seek(kafkaSourceDescriptor.getTopicPartition(), from);
            ConsumerRecords.empty();
            while (true) {
                ConsumerRecords poll = consumer.poll(KAFKA_POLL_TIMEOUT);
                if (poll.isEmpty()) {
                    DoFn.ProcessContinuation resume = DoFn.ProcessContinuation.resume();
                    if (consumer != null) {
                        $closeResource(null, consumer);
                    }
                    return resume;
                }
                Iterator it2 = poll.iterator();
                while (it2.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it2.next();
                    if (!restrictionTracker.tryClaim(Long.valueOf(consumerRecord.offset()))) {
                        DoFn.ProcessContinuation stop2 = DoFn.ProcessContinuation.stop();
                        if (consumer != null) {
                            $closeResource(null, consumer);
                        }
                        return stop2;
                    }
                    KafkaRecord<K, V> kafkaRecord = new KafkaRecord<>(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(), ConsumerSpEL.getRecordTimestamp(consumerRecord), ConsumerSpEL.getRecordTimestampType(consumerRecord), ConsumerSpEL.hasHeaders() ? consumerRecord.headers() : null, ConsumerSpEL.deserializeKey(deserializer, consumerRecord), ConsumerSpEL.deserializeValue(deserializer2, consumerRecord));
                    ((AverageRecordSize) loadingCache.getUnchecked(kafkaSourceDescriptor.getTopicPartition())).update((consumerRecord.key() == null ? 0 : ((byte[]) consumerRecord.key()).length) + (consumerRecord.value() == null ? 0 : ((byte[]) consumerRecord.value()).length), consumerRecord.offset() - j);
                    j = consumerRecord.offset() + 1;
                    if (timestampPolicy != null) {
                        org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(watermarkEstimator instanceof ManualWatermarkEstimator);
                        KafkaUnboundedReader.TimestampPolicyContext timestampPolicyContext = new KafkaUnboundedReader.TimestampPolicyContext((long) ((RestrictionTracker.HasProgress) restrictionTracker).getProgress().getWorkRemaining(), Instant.now());
                        instant = timestampPolicy.getTimestampForRecord(timestampPolicyContext, kafkaRecord);
                        ((ManualWatermarkEstimator) watermarkEstimator).setWatermark(ensureTimestampWithinBounds(timestampPolicy.getWatermark(timestampPolicyContext)));
                    } else {
                        Preconditions.checkStateNotNull(this.extractOutputTimestampFn);
                        instant = (Instant) this.extractOutputTimestampFn.apply(kafkaRecord);
                    }
                    outputReceiver.outputWithTimestamp(KV.of(kafkaSourceDescriptor, kafkaRecord), instant);
                }
            }
        } catch (Throwable th) {
            if (consumer != null) {
                $closeResource(null, consumer);
            }
            throw th;
        }
    }

    @DoFn.GetRestrictionCoder
    public Coder<OffsetRange> restrictionCoder() {
        return new OffsetRange.Coder();
    }

    @DoFn.Setup
    public void setup() throws Exception {
        this.avgRecordSize = CacheBuilder.newBuilder().maximumSize(1000L).build(new CacheLoader<TopicPartition, AverageRecordSize>() { // from class: org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn.1
            public AverageRecordSize load(TopicPartition topicPartition) throws Exception {
                return new AverageRecordSize();
            }
        });
        this.keyDeserializerInstance = this.keyDeserializerProvider.getDeserializer(this.consumerConfig, true);
        this.valueDeserializerInstance = this.valueDeserializerProvider.getDeserializer(this.consumerConfig, false);
    }

    @DoFn.Teardown
    public void teardown() throws Exception {
        Deserializer deserializer = (Deserializer) Preconditions.checkStateNotNull(this.keyDeserializerInstance);
        Deserializer deserializer2 = (Deserializer) Preconditions.checkStateNotNull(this.valueDeserializerInstance);
        try {
            Closeables.close(deserializer, true);
            Closeables.close(deserializer2, true);
        } catch (Exception e) {
            LOG.warn("Fail to close resource during finishing bundle.", e);
        }
    }

    private Map<String, Object> overrideBootstrapServersConfig(Map<String, Object> map, KafkaSourceDescriptor kafkaSourceDescriptor) {
        org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(map.containsKey("bootstrap.servers") || kafkaSourceDescriptor.getBootStrapServers() != null);
        HashMap hashMap = new HashMap(map);
        if (kafkaSourceDescriptor.getBootStrapServers() != null && kafkaSourceDescriptor.getBootStrapServers().size() > 0) {
            hashMap.put("bootstrap.servers", String.join(",", kafkaSourceDescriptor.getBootStrapServers()));
        }
        return hashMap;
    }

    private static Instant ensureTimestampWithinBounds(Instant instant) {
        if (instant.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
            instant = BoundedWindow.TIMESTAMP_MIN_VALUE;
        } else if (instant.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
            instant = BoundedWindow.TIMESTAMP_MAX_VALUE;
        }
        return instant;
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
