package org.apache.flink.streaming.connectors.kafka.internals;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.class */
public abstract class AbstractFetcher<T, KPH> {
    private static final int NO_TIMESTAMPS_WATERMARKS = 0;
    private static final int PERIODIC_WATERMARKS = 1;
    private static final int PUNCTUATED_WATERMARKS = 2;
    private final SourceFunction.SourceContext<T> sourceContext;
    private final Object checkpointLock;
    private final KafkaTopicPartitionState<KPH>[] allPartitions;
    private final int timestampWatermarkMode;
    protected final boolean useMetrics;
    private volatile long maxWatermarkSoFar = Long.MIN_VALUE;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$streaming$connectors$kafka$internals$AbstractFetcher$OffsetGaugeType = new int[OffsetGaugeType.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$internals$AbstractFetcher$OffsetGaugeType[OffsetGaugeType.COMMITTED_OFFSET.ordinal()] = AbstractFetcher.PERIODIC_WATERMARKS;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$internals$AbstractFetcher$OffsetGaugeType[OffsetGaugeType.CURRENT_OFFSET.ordinal()] = AbstractFetcher.PUNCTUATED_WATERMARKS;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher$OffsetGauge.class */
    private static class OffsetGauge implements Gauge<Long> {
        private final KafkaTopicPartitionState ktp;
        private final OffsetGaugeType gaugeType;

        public OffsetGauge(KafkaTopicPartitionState kafkaTopicPartitionState, OffsetGaugeType offsetGaugeType) {
            this.ktp = kafkaTopicPartitionState;
            this.gaugeType = offsetGaugeType;
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public Long m5getValue() {
            switch (AnonymousClass1.$SwitchMap$org$apache$flink$streaming$connectors$kafka$internals$AbstractFetcher$OffsetGaugeType[this.gaugeType.ordinal()]) {
                case AbstractFetcher.PERIODIC_WATERMARKS /* 1 */:
                    return Long.valueOf(this.ktp.getCommittedOffset());
                case AbstractFetcher.PUNCTUATED_WATERMARKS /* 2 */:
                    return Long.valueOf(this.ktp.getOffset());
                default:
                    throw new RuntimeException("Unknown gauge type: " + this.gaugeType);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher$OffsetGaugeType.class */
    public enum OffsetGaugeType {
        CURRENT_OFFSET,
        COMMITTED_OFFSET
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher$PeriodicWatermarkEmitter.class */
    private static class PeriodicWatermarkEmitter implements Triggerable {
        private final KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions;
        private final SourceFunction.SourceContext<?> emitter;
        private final StreamingRuntimeContext triggerContext;
        private final long interval;
        private long lastWatermarkTimestamp = Long.MIN_VALUE;
        static final /* synthetic */ boolean $assertionsDisabled;

        PeriodicWatermarkEmitter(KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] kafkaTopicPartitionStateWithPeriodicWatermarksArr, SourceFunction.SourceContext<?> sourceContext, StreamingRuntimeContext streamingRuntimeContext) {
            this.allPartitions = (KafkaTopicPartitionStateWithPeriodicWatermarks[]) Preconditions.checkNotNull(kafkaTopicPartitionStateWithPeriodicWatermarksArr);
            this.emitter = (SourceFunction.SourceContext) Preconditions.checkNotNull(sourceContext);
            this.triggerContext = (StreamingRuntimeContext) Preconditions.checkNotNull(streamingRuntimeContext);
            this.interval = streamingRuntimeContext.getExecutionConfig().getAutoWatermarkInterval();
        }

        public void start() {
            this.triggerContext.registerTimer(this.triggerContext.getCurrentProcessingTime() + this.interval, this);
        }

        public void trigger(long j) throws Exception {
            long currentWatermarkTimestamp;
            if (!$assertionsDisabled && !Thread.holdsLock(this.emitter.getCheckpointLock())) {
                throw new AssertionError();
            }
            long j2 = Long.MAX_VALUE;
            KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] kafkaTopicPartitionStateWithPeriodicWatermarksArr = this.allPartitions;
            int length = kafkaTopicPartitionStateWithPeriodicWatermarksArr.length;
            for (int i = AbstractFetcher.NO_TIMESTAMPS_WATERMARKS; i < length; i += AbstractFetcher.PERIODIC_WATERMARKS) {
                KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?> kafkaTopicPartitionStateWithPeriodicWatermarks = kafkaTopicPartitionStateWithPeriodicWatermarksArr[i];
                synchronized (kafkaTopicPartitionStateWithPeriodicWatermarks) {
                    currentWatermarkTimestamp = kafkaTopicPartitionStateWithPeriodicWatermarks.getCurrentWatermarkTimestamp();
                }
                j2 = Math.min(j2, currentWatermarkTimestamp);
            }
            if (j2 > this.lastWatermarkTimestamp) {
                this.lastWatermarkTimestamp = j2;
                this.emitter.emitWatermark(new Watermark(j2));
            }
            this.triggerContext.registerTimer(this.triggerContext.getCurrentProcessingTime() + this.interval, this);
        }

        static {
            $assertionsDisabled = !AbstractFetcher.class.desiredAssertionStatus();
        }
    }

    protected AbstractFetcher(SourceFunction.SourceContext<T> sourceContext, List<KafkaTopicPartition> list, SerializedValue<AssignerWithPeriodicWatermarks<T>> serializedValue, SerializedValue<AssignerWithPunctuatedWatermarks<T>> serializedValue2, StreamingRuntimeContext streamingRuntimeContext, boolean z) throws Exception {
        this.sourceContext = (SourceFunction.SourceContext) Preconditions.checkNotNull(sourceContext);
        this.checkpointLock = sourceContext.getCheckpointLock();
        this.useMetrics = z;
        if (serializedValue == null) {
            if (serializedValue2 == null) {
                this.timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS;
            } else {
                this.timestampWatermarkMode = PUNCTUATED_WATERMARKS;
            }
        } else {
            if (serializedValue2 != null) {
                throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks");
            }
            this.timestampWatermarkMode = PERIODIC_WATERMARKS;
        }
        this.allPartitions = initializePartitions(list, this.timestampWatermarkMode, serializedValue, serializedValue2, streamingRuntimeContext.getUserCodeClassLoader());
        if (this.timestampWatermarkMode == PERIODIC_WATERMARKS) {
            new PeriodicWatermarkEmitter((KafkaTopicPartitionStateWithPeriodicWatermarks[]) this.allPartitions, sourceContext, streamingRuntimeContext).start();
        }
    }

    protected final KafkaTopicPartitionState<KPH>[] subscribedPartitions() {
        return this.allPartitions;
    }

    public abstract void runFetchLoop() throws Exception;

    public abstract void cancel();

    public abstract KPH createKafkaPartitionHandle(KafkaTopicPartition kafkaTopicPartition);

    public abstract void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> map) throws Exception;

    public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() {
        if (!$assertionsDisabled && !Thread.holdsLock(this.checkpointLock)) {
            throw new AssertionError();
        }
        HashMap<KafkaTopicPartition, Long> hashMap = new HashMap<>(this.allPartitions.length);
        KafkaTopicPartitionState<KPH>[] subscribedPartitions = subscribedPartitions();
        int length = subscribedPartitions.length;
        for (int i = NO_TIMESTAMPS_WATERMARKS; i < length; i += PERIODIC_WATERMARKS) {
            KafkaTopicPartitionState<KPH> kafkaTopicPartitionState = subscribedPartitions[i];
            if (kafkaTopicPartitionState.isOffsetDefined()) {
                hashMap.put(kafkaTopicPartitionState.getKafkaTopicPartition(), Long.valueOf(kafkaTopicPartitionState.getOffset()));
            }
        }
        return hashMap;
    }

    public void restoreOffsets(HashMap<KafkaTopicPartition, Long> hashMap) {
        KafkaTopicPartitionState<KPH>[] kafkaTopicPartitionStateArr = this.allPartitions;
        int length = kafkaTopicPartitionStateArr.length;
        for (int i = NO_TIMESTAMPS_WATERMARKS; i < length; i += PERIODIC_WATERMARKS) {
            KafkaTopicPartitionState<KPH> kafkaTopicPartitionState = kafkaTopicPartitionStateArr[i];
            Long l = hashMap.get(kafkaTopicPartitionState.getKafkaTopicPartition());
            if (l != null) {
                kafkaTopicPartitionState.setOffset(l.longValue());
            }
        }
    }

    protected final void emitRecord(T t, KafkaTopicPartitionState<KPH> kafkaTopicPartitionState, long j) {
        if (this.timestampWatermarkMode == 0) {
            synchronized (this.checkpointLock) {
                this.sourceContext.collect(t);
                kafkaTopicPartitionState.setOffset(j);
            }
            return;
        }
        if (this.timestampWatermarkMode == PERIODIC_WATERMARKS) {
            emitRecordWithTimestampAndPeriodicWatermark(t, kafkaTopicPartitionState, j);
        } else {
            emitRecordWithTimestampAndPunctuatedWatermark(t, kafkaTopicPartitionState, j);
        }
    }

    private void emitRecordWithTimestampAndPeriodicWatermark(T t, KafkaTopicPartitionState<KPH> kafkaTopicPartitionState, long j) {
        long timestampForRecord;
        KafkaTopicPartitionStateWithPeriodicWatermarks kafkaTopicPartitionStateWithPeriodicWatermarks = (KafkaTopicPartitionStateWithPeriodicWatermarks) kafkaTopicPartitionState;
        synchronized (kafkaTopicPartitionStateWithPeriodicWatermarks) {
            timestampForRecord = kafkaTopicPartitionStateWithPeriodicWatermarks.getTimestampForRecord(t);
        }
        synchronized (this.checkpointLock) {
            this.sourceContext.collectWithTimestamp(t, timestampForRecord);
            kafkaTopicPartitionState.setOffset(j);
        }
    }

    private void emitRecordWithTimestampAndPunctuatedWatermark(T t, KafkaTopicPartitionState<KPH> kafkaTopicPartitionState, long j) {
        KafkaTopicPartitionStateWithPunctuatedWatermarks kafkaTopicPartitionStateWithPunctuatedWatermarks = (KafkaTopicPartitionStateWithPunctuatedWatermarks) kafkaTopicPartitionState;
        long timestampForRecord = kafkaTopicPartitionStateWithPunctuatedWatermarks.getTimestampForRecord(t);
        Watermark checkAndGetNewWatermark = kafkaTopicPartitionStateWithPunctuatedWatermarks.checkAndGetNewWatermark(t, timestampForRecord);
        synchronized (this.checkpointLock) {
            this.sourceContext.collectWithTimestamp(t, timestampForRecord);
            kafkaTopicPartitionState.setOffset(j);
        }
        if (checkAndGetNewWatermark != null) {
            updateMinPunctuatedWatermark(checkAndGetNewWatermark);
        }
    }

    private void updateMinPunctuatedWatermark(Watermark watermark) {
        if (watermark.getTimestamp() > this.maxWatermarkSoFar) {
            long j = Long.MAX_VALUE;
            KafkaTopicPartitionState<KPH>[] kafkaTopicPartitionStateArr = this.allPartitions;
            int length = kafkaTopicPartitionStateArr.length;
            for (int i = NO_TIMESTAMPS_WATERMARKS; i < length; i += PERIODIC_WATERMARKS) {
                j = Math.min(j, ((KafkaTopicPartitionStateWithPunctuatedWatermarks) kafkaTopicPartitionStateArr[i]).getCurrentPartitionWatermark());
            }
            if (j > this.maxWatermarkSoFar) {
                synchronized (this.checkpointLock) {
                    if (j > this.maxWatermarkSoFar) {
                        this.maxWatermarkSoFar = j;
                        this.sourceContext.emitWatermark(new Watermark(j));
                    }
                }
            }
        }
    }

    private KafkaTopicPartitionState<KPH>[] initializePartitions(List<KafkaTopicPartition> list, int i, SerializedValue<AssignerWithPeriodicWatermarks<T>> serializedValue, SerializedValue<AssignerWithPunctuatedWatermarks<T>> serializedValue2, ClassLoader classLoader) throws IOException, ClassNotFoundException {
        switch (i) {
            case NO_TIMESTAMPS_WATERMARKS /* 0 */:
                KafkaTopicPartitionState<KPH>[] kafkaTopicPartitionStateArr = new KafkaTopicPartitionState[list.size()];
                int i2 = NO_TIMESTAMPS_WATERMARKS;
                for (KafkaTopicPartition kafkaTopicPartition : list) {
                    KPH createKafkaPartitionHandle = createKafkaPartitionHandle(kafkaTopicPartition);
                    int i3 = i2;
                    i2 += PERIODIC_WATERMARKS;
                    kafkaTopicPartitionStateArr[i3] = new KafkaTopicPartitionState<>(kafkaTopicPartition, createKafkaPartitionHandle);
                }
                return kafkaTopicPartitionStateArr;
            case PERIODIC_WATERMARKS /* 1 */:
                KafkaTopicPartitionStateWithPeriodicWatermarks[] kafkaTopicPartitionStateWithPeriodicWatermarksArr = new KafkaTopicPartitionStateWithPeriodicWatermarks[list.size()];
                int i4 = NO_TIMESTAMPS_WATERMARKS;
                for (KafkaTopicPartition kafkaTopicPartition2 : list) {
                    KPH createKafkaPartitionHandle2 = createKafkaPartitionHandle(kafkaTopicPartition2);
                    AssignerWithPeriodicWatermarks assignerWithPeriodicWatermarks = (AssignerWithPeriodicWatermarks) serializedValue.deserializeValue(classLoader);
                    int i5 = i4;
                    i4 += PERIODIC_WATERMARKS;
                    kafkaTopicPartitionStateWithPeriodicWatermarksArr[i5] = new KafkaTopicPartitionStateWithPeriodicWatermarks(kafkaTopicPartition2, createKafkaPartitionHandle2, assignerWithPeriodicWatermarks);
                }
                return kafkaTopicPartitionStateWithPeriodicWatermarksArr;
            case PUNCTUATED_WATERMARKS /* 2 */:
                KafkaTopicPartitionStateWithPunctuatedWatermarks[] kafkaTopicPartitionStateWithPunctuatedWatermarksArr = new KafkaTopicPartitionStateWithPunctuatedWatermarks[list.size()];
                int i6 = NO_TIMESTAMPS_WATERMARKS;
                for (KafkaTopicPartition kafkaTopicPartition3 : list) {
                    KPH createKafkaPartitionHandle3 = createKafkaPartitionHandle(kafkaTopicPartition3);
                    AssignerWithPunctuatedWatermarks assignerWithPunctuatedWatermarks = (AssignerWithPunctuatedWatermarks) serializedValue2.deserializeValue(classLoader);
                    int i7 = i6;
                    i6 += PERIODIC_WATERMARKS;
                    kafkaTopicPartitionStateWithPunctuatedWatermarksArr[i7] = new KafkaTopicPartitionStateWithPunctuatedWatermarks(kafkaTopicPartition3, createKafkaPartitionHandle3, assignerWithPunctuatedWatermarks);
                }
                return kafkaTopicPartitionStateWithPunctuatedWatermarksArr;
            default:
                throw new RuntimeException();
        }
    }

    protected void addOffsetStateGauge(MetricGroup metricGroup) {
        MetricGroup addGroup = metricGroup.addGroup("current-offsets");
        MetricGroup addGroup2 = metricGroup.addGroup("committed-offsets");
        KafkaTopicPartitionState<KPH>[] subscribedPartitions = subscribedPartitions();
        int length = subscribedPartitions.length;
        for (int i = NO_TIMESTAMPS_WATERMARKS; i < length; i += PERIODIC_WATERMARKS) {
            KafkaTopicPartitionState<KPH> kafkaTopicPartitionState = subscribedPartitions[i];
            addGroup.gauge(kafkaTopicPartitionState.getTopic() + "-" + kafkaTopicPartitionState.getPartition(), new OffsetGauge(kafkaTopicPartitionState, OffsetGaugeType.CURRENT_OFFSET));
            addGroup2.gauge(kafkaTopicPartitionState.getTopic() + "-" + kafkaTopicPartitionState.getPartition(), new OffsetGauge(kafkaTopicPartitionState, OffsetGaugeType.COMMITTED_OFFSET));
        }
    }

    static {
        $assertionsDisabled = !AbstractFetcher.class.desiredAssertionStatus();
    }
}
