package org.apache.flink.streaming.connectors.kinesis.internals;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.HashKeyRange;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.SequenceNumberRange;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Shard;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisherFactory;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisherFactory;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisherFactory;
import org.apache.flink.streaming.connectors.kinesis.metrics.KinesisConsumerMetricConstants;
import org.apache.flink.streaming.connectors.kinesis.metrics.ShardConsumerMetricsReporter;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Factory;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
import org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter;
import org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil;
import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException
    */
@Internal
/* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.class */
public class KinesisDataFetcher<T> {
    public static final KinesisShardAssigner DEFAULT_SHARD_ASSIGNER;
    private static final Logger LOG;
    private final Properties configProps;
    private final List<String> streams;
    private final KinesisDeserializationSchema<T> deserializationSchema;
    private final KinesisShardAssigner shardAssigner;
    private final MetricGroup consumerMetricGroup;
    private final RuntimeContext runtimeContext;
    private final int totalNumberOfConsumerSubtasks;
    private final int indexOfThisConsumerSubtask;
    private final ExecutorService shardConsumersExecutor;
    private final Map<String, String> subscribedStreamsToLastDiscoveredShardIds;
    private final List<KinesisStreamShardState> subscribedShardsState;
    private final SourceFunction.SourceContext<T> sourceContext;
    private final Object checkpointLock;
    private final AtomicReference<Throwable> error;
    private final FlinkKinesisProxyFactory kinesisProxyFactory;
    private final FlinkKinesisProxyV2Factory kinesisProxyV2Factory;
    private final KinesisProxyInterface kinesis;
    private final RecordPublisherFactory recordPublisherFactory;
    private volatile Thread mainThread;
    private final AtomicInteger numberOfActiveShards;
    private volatile boolean running;
    private final AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner;
    private final WatermarkTracker watermarkTracker;
    private final RecordEmitter recordEmitter;
    private boolean isIdle;
    private ConcurrentHashMap<Integer, ShardWatermarkState> shardWatermarks;
    private long lastWatermark;
    private long nextWatermark;
    private long shardIdleIntervalMillis;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* renamed from: org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$1 */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher$1.class */
    public class AnonymousClass1 implements Runnable {
        AnonymousClass1() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                KinesisDataFetcher.this.recordEmitter.run();
            } catch (Throwable th) {
                KinesisDataFetcher.this.stopWithError(th);
            }
        }
    }

    /* renamed from: org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$2 */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher$2.class */
    public class AnonymousClass2 implements ThreadFactory {
        private final AtomicLong threadCount = new AtomicLong(0);
        final /* synthetic */ String val$subtaskName;

        AnonymousClass2(String str) {
            r8 = str;
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable);
            thread.setName("shardConsumers-" + r8 + "-thread-" + this.threadCount.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher$AsyncKinesisRecordEmitter.class */
    public class AsyncKinesisRecordEmitter extends RecordEmitter<RecordWrapper<T>> {
        private AsyncKinesisRecordEmitter(KinesisDataFetcher kinesisDataFetcher) {
            this(100);
        }

        private AsyncKinesisRecordEmitter(int i) {
            super(i);
        }

        @Override // org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter
        public void emit(RecordWrapper<T> recordWrapper, RecordEmitter.RecordQueue<RecordWrapper<T>> recordQueue) {
            KinesisDataFetcher.this.emitRecordAndUpdateState(recordWrapper);
        }

        /* synthetic */ AsyncKinesisRecordEmitter(KinesisDataFetcher kinesisDataFetcher, AnonymousClass1 anonymousClass1) {
            this(kinesisDataFetcher);
        }

        /* synthetic */ AsyncKinesisRecordEmitter(KinesisDataFetcher kinesisDataFetcher, int i, AnonymousClass1 anonymousClass1) {
            this(i);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher$FlinkKinesisProxyFactory.class */
    public interface FlinkKinesisProxyFactory {
        KinesisProxyInterface create(Properties properties);
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher$FlinkKinesisProxyV2Factory.class */
    public interface FlinkKinesisProxyV2Factory {
        KinesisProxyV2Interface create(Properties properties);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher$PeriodicWatermarkEmitter.class */
    public class PeriodicWatermarkEmitter implements ProcessingTimeCallback {
        private final ProcessingTimeService timerService;
        private final long interval;

        PeriodicWatermarkEmitter(ProcessingTimeService processingTimeService, long j) {
            this.timerService = (ProcessingTimeService) Preconditions.checkNotNull(processingTimeService);
            this.interval = j;
        }

        public void start() {
            KinesisDataFetcher.LOG.debug("registering periodic watermark timer with interval {}", Long.valueOf(this.interval));
            this.timerService.registerTimer(this.timerService.getCurrentProcessingTime() + this.interval, this);
        }

        public void onProcessingTime(long j) {
            KinesisDataFetcher.this.emitWatermark();
            this.timerService.registerTimer(this.timerService.getCurrentProcessingTime() + this.interval, this);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher$RecordWrapper.class */
    public static class RecordWrapper<T> extends TimestampedValue<T> {
        int shardStateIndex;
        SequenceNumber lastSequenceNumber;
        long timestamp;
        Watermark watermark;

        private RecordWrapper(T t, long j) {
            super(t, j);
            this.timestamp = j;
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        /* synthetic */ RecordWrapper(Object obj, long j, AnonymousClass1 anonymousClass1) {
            this(obj, j);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher$ShardWatermarkState.class */
    public static class ShardWatermarkState<T> {
        private AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner;
        private RecordEmitter.RecordQueue<RecordWrapper<T>> emitQueue;
        private volatile long lastRecordTimestamp;
        private volatile long lastUpdated;
        private volatile Watermark lastEmittedRecordWatermark;

        private ShardWatermarkState() {
        }

        /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.ShardWatermarkState.access$602(org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$ShardWatermarkState, long):long
            java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
            	at java.base/java.lang.System.arraycopy(Native Method)
            	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
            	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
            	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
            	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
            	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
            	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
            	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
            	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:449)
            	at jadx.core.ProcessClass.process(ProcessClass.java:70)
            	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
            	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
            	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
            	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
            */
        static /* synthetic */ long access$602(org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.ShardWatermarkState r6, long r7) {
            /*
                r0 = r6
                r1 = r7
                // decode failed: arraycopy: source index -1 out of bounds for object array[6]
                r0.lastRecordTimestamp = r1
                return r-1
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.ShardWatermarkState.access$602(org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$ShardWatermarkState, long):long");
        }

        /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.ShardWatermarkState.access$702(org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$ShardWatermarkState, long):long
            java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
            	at java.base/java.lang.System.arraycopy(Native Method)
            	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
            	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
            	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
            	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
            	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
            	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
            	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
            	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:449)
            	at jadx.core.ProcessClass.process(ProcessClass.java:70)
            	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
            	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
            	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
            	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
            */
        static /* synthetic */ long access$702(org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.ShardWatermarkState r6, long r7) {
            /*
                r0 = r6
                r1 = r7
                // decode failed: arraycopy: source index -1 out of bounds for object array[6]
                r0.lastUpdated = r1
                return r-1
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.ShardWatermarkState.access$702(org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$ShardWatermarkState, long):long");
        }

        /* synthetic */ ShardWatermarkState(AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher$SyncKinesisRecordEmitter.class */
    public class SyncKinesisRecordEmitter extends KinesisDataFetcher<T>.AsyncKinesisRecordEmitter {
        private final ConcurrentHashMap<Integer, RecordEmitter.RecordQueue<RecordWrapper<T>>> queues;
        final /* synthetic */ KinesisDataFetcher this$0;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* renamed from: org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1 */
        /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher$SyncKinesisRecordEmitter$1.class */
        public class AnonymousClass1 implements RecordEmitter.RecordQueue<RecordWrapper<T>> {
            final /* synthetic */ SyncKinesisRecordEmitter this$1;

            AnonymousClass1(SyncKinesisRecordEmitter syncKinesisRecordEmitter) {
                this.this$1 = syncKinesisRecordEmitter;
            }

            public void put(RecordWrapper<T> recordWrapper) {
                this.this$1.emit((RecordWrapper) recordWrapper, (RecordEmitter.RecordQueue) this);
            }

            @Override // org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter.RecordQueue
            public int getSize() {
                return 0;
            }

            @Override // org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter.RecordQueue
            public RecordWrapper<T> peek() {
                return null;
            }

            @Override // org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter.RecordQueue
            public /* bridge */ /* synthetic */ Object peek() {
                return peek();
            }

            @Override // org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter.RecordQueue
            public /* bridge */ /* synthetic */ void put(Object obj) throws InterruptedException {
                put((RecordWrapper) obj);
            }
        }

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        private SyncKinesisRecordEmitter(KinesisDataFetcher kinesisDataFetcher) {
            super();
            this.this$0 = kinesisDataFetcher;
            this.queues = new ConcurrentHashMap<>();
        }

        @Override // org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter
        public RecordEmitter.RecordQueue<RecordWrapper<T>> getQueue(int i) {
            return this.queues.computeIfAbsent(Integer.valueOf(i), num -> {
                return new RecordEmitter.RecordQueue<RecordWrapper<T>>(this) { // from class: org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.SyncKinesisRecordEmitter.1
                    final /* synthetic */ SyncKinesisRecordEmitter this$1;

                    AnonymousClass1(SyncKinesisRecordEmitter this) {
                        this.this$1 = this;
                    }

                    public void put(RecordWrapper<T> recordWrapper) {
                        this.this$1.emit((RecordWrapper) recordWrapper, (RecordEmitter.RecordQueue) this);
                    }

                    @Override // org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter.RecordQueue
                    public int getSize() {
                        return 0;
                    }

                    @Override // org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter.RecordQueue
                    public RecordWrapper<T> peek() {
                        return null;
                    }

                    @Override // org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter.RecordQueue
                    public /* bridge */ /* synthetic */ Object peek() {
                        return peek();
                    }

                    @Override // org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter.RecordQueue
                    public /* bridge */ /* synthetic */ void put(Object obj) throws InterruptedException {
                        put((RecordWrapper) obj);
                    }
                };
            });
        }

        /* synthetic */ SyncKinesisRecordEmitter(KinesisDataFetcher kinesisDataFetcher, AnonymousClass1 anonymousClass1) {
            this(kinesisDataFetcher);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher$WatermarkSyncCallback.class */
    public class WatermarkSyncCallback implements ProcessingTimeCallback {
        private static final long LOG_INTERVAL_MILLIS = 60000;
        private final ProcessingTimeService timerService;
        private final long interval;
        private long lastGlobalWatermark = Long.MIN_VALUE;
        private long propagatedLocalWatermark = Long.MIN_VALUE;
        private int stalledWatermarkIntervalCount = 0;
        private long lastLogged;
        final /* synthetic */ KinesisDataFetcher this$0;

        WatermarkSyncCallback(KinesisDataFetcher kinesisDataFetcher, ProcessingTimeService processingTimeService, long j) {
            this.this$0 = kinesisDataFetcher;
            this.timerService = (ProcessingTimeService) Preconditions.checkNotNull(processingTimeService);
            this.interval = j;
            MetricGroup addGroup = kinesisDataFetcher.consumerMetricGroup.addGroup("subtaskId", String.valueOf(kinesisDataFetcher.indexOfThisConsumerSubtask));
            addGroup.gauge("localWatermark", () -> {
                return Long.valueOf(this.this$0.nextWatermark);
            });
            addGroup.gauge("globalWatermark", () -> {
                return Long.valueOf(this.lastGlobalWatermark);
            });
        }

        public void start() {
            KinesisDataFetcher.LOG.info("Registering watermark tracker with interval {}", Long.valueOf(this.interval));
            this.timerService.registerTimer(this.timerService.getCurrentProcessingTime() + this.interval, this);
        }

        public void onProcessingTime(long j) {
            if (this.this$0.nextWatermark != Long.MIN_VALUE) {
                long j2 = this.lastGlobalWatermark;
                if (this.this$0.isIdle && this.this$0.nextWatermark == this.propagatedLocalWatermark) {
                    KinesisDataFetcher.LOG.info("WatermarkSyncCallback subtask: {} is idle", Integer.valueOf(this.this$0.indexOfThisConsumerSubtask));
                } else {
                    j2 = this.this$0.watermarkTracker.updateWatermark(this.this$0.nextWatermark);
                    this.propagatedLocalWatermark = this.this$0.nextWatermark;
                }
                if (j - this.lastLogged > 60000) {
                    this.lastLogged = System.currentTimeMillis();
                    KinesisDataFetcher.LOG.info("WatermarkSyncCallback subtask: {} local watermark: {}, global watermark: {}, delta: {} timeouts: {}, emitter: {}", new Object[]{Integer.valueOf(this.this$0.indexOfThisConsumerSubtask), Long.valueOf(this.this$0.nextWatermark), Long.valueOf(j2), Long.valueOf(this.this$0.nextWatermark - j2), Long.valueOf(this.this$0.watermarkTracker.getUpdateTimeoutCount()), this.this$0.recordEmitter.printInfo()});
                    if (j2 == this.this$0.nextWatermark && j2 == this.lastGlobalWatermark) {
                        int i = this.stalledWatermarkIntervalCount;
                        this.stalledWatermarkIntervalCount = i + 1;
                        if (i > 5) {
                            this.stalledWatermarkIntervalCount = 0;
                            for (Map.Entry entry : this.this$0.shardWatermarks.entrySet()) {
                                RecordWrapper recordWrapper = (RecordWrapper) ((ShardWatermarkState) entry.getValue()).emitQueue.peek();
                                if (recordWrapper != null) {
                                    KinesisDataFetcher.LOG.info("stalled watermark {} key {} next watermark {} next timestamp {}", new Object[]{Long.valueOf(this.this$0.nextWatermark), entry.getKey(), recordWrapper.watermark, Long.valueOf(recordWrapper.timestamp)});
                                }
                            }
                        }
                    }
                }
                this.lastGlobalWatermark = j2;
                this.this$0.recordEmitter.setCurrentWatermark(j2);
            }
            this.timerService.registerTimer(this.timerService.getCurrentProcessingTime() + this.interval, this);
        }
    }

    public KinesisDataFetcher(List<String> list, SourceFunction.SourceContext<T> sourceContext, RuntimeContext runtimeContext, Properties properties, KinesisDeserializationSchema<T> kinesisDeserializationSchema, KinesisShardAssigner kinesisShardAssigner, AssignerWithPeriodicWatermarks<T> assignerWithPeriodicWatermarks, WatermarkTracker watermarkTracker) {
        this(list, sourceContext, sourceContext.getCheckpointLock(), runtimeContext, properties, kinesisDeserializationSchema, kinesisShardAssigner, assignerWithPeriodicWatermarks, watermarkTracker, new AtomicReference(), new ArrayList(), createInitialSubscribedStreamsToLastDiscoveredShardsState(list), KinesisProxy::create, KinesisProxyV2Factory::createKinesisProxyV2);
    }

    @VisibleForTesting
    public KinesisDataFetcher(List<String> list, SourceFunction.SourceContext<T> sourceContext, Object obj, RuntimeContext runtimeContext, Properties properties, KinesisDeserializationSchema<T> kinesisDeserializationSchema, KinesisShardAssigner kinesisShardAssigner, AssignerWithPeriodicWatermarks<T> assignerWithPeriodicWatermarks, WatermarkTracker watermarkTracker, AtomicReference<Throwable> atomicReference, List<KinesisStreamShardState> list2, HashMap<String, String> hashMap, FlinkKinesisProxyFactory flinkKinesisProxyFactory, @Nullable FlinkKinesisProxyV2Factory flinkKinesisProxyV2Factory) {
        this.numberOfActiveShards = new AtomicInteger(0);
        this.running = true;
        this.shardWatermarks = new ConcurrentHashMap<>();
        this.lastWatermark = Long.MIN_VALUE;
        this.nextWatermark = Long.MIN_VALUE;
        this.shardIdleIntervalMillis = -1L;
        this.streams = (List) Preconditions.checkNotNull(list);
        this.configProps = (Properties) Preconditions.checkNotNull(properties);
        this.sourceContext = (SourceFunction.SourceContext) Preconditions.checkNotNull(sourceContext);
        this.checkpointLock = Preconditions.checkNotNull(obj);
        this.runtimeContext = (RuntimeContext) Preconditions.checkNotNull(runtimeContext);
        this.totalNumberOfConsumerSubtasks = runtimeContext.getNumberOfParallelSubtasks();
        this.indexOfThisConsumerSubtask = runtimeContext.getIndexOfThisSubtask();
        this.deserializationSchema = (KinesisDeserializationSchema) Preconditions.checkNotNull(kinesisDeserializationSchema);
        this.shardAssigner = (KinesisShardAssigner) Preconditions.checkNotNull(kinesisShardAssigner);
        this.periodicWatermarkAssigner = assignerWithPeriodicWatermarks;
        this.watermarkTracker = watermarkTracker;
        this.kinesisProxyFactory = (FlinkKinesisProxyFactory) Preconditions.checkNotNull(flinkKinesisProxyFactory);
        this.kinesisProxyV2Factory = flinkKinesisProxyV2Factory;
        this.kinesis = flinkKinesisProxyFactory.create(properties);
        this.recordPublisherFactory = createRecordPublisherFactory();
        this.consumerMetricGroup = runtimeContext.getMetricGroup().addGroup(KinesisConsumerMetricConstants.KINESIS_CONSUMER_METRICS_GROUP);
        this.error = (AtomicReference) Preconditions.checkNotNull(atomicReference);
        this.subscribedShardsState = (List) Preconditions.checkNotNull(list2);
        this.subscribedStreamsToLastDiscoveredShardIds = (Map) Preconditions.checkNotNull(hashMap);
        this.shardConsumersExecutor = createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks());
        this.recordEmitter = createRecordEmitter(properties);
        StreamConsumerRegistrarUtil.lazilyRegisterStreamConsumers(properties, list);
    }

    private RecordEmitter createRecordEmitter(Properties properties) {
        return (this.periodicWatermarkAssigner == null || this.watermarkTracker == null) ? new SyncKinesisRecordEmitter(this, null) : new AsyncKinesisRecordEmitter(Integer.parseInt(properties.getProperty(ConsumerConfigConstants.WATERMARK_SYNC_QUEUE_CAPACITY, Integer.toString(100))));
    }

    protected ShardConsumer<T> createShardConsumer(Integer num, StreamShardHandle streamShardHandle, SequenceNumber sequenceNumber, MetricGroup metricGroup, KinesisDeserializationSchema<T> kinesisDeserializationSchema) throws InterruptedException {
        return new ShardConsumer<>(this, createRecordPublisher(sequenceNumber, this.configProps, metricGroup, streamShardHandle), num, streamShardHandle, sequenceNumber, new ShardConsumerMetricsReporter(metricGroup), kinesisDeserializationSchema);
    }

    protected RecordPublisherFactory createRecordPublisherFactory() {
        switch (ConsumerConfigConstants.RecordPublisherType.valueOf(this.configProps.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, ConsumerConfigConstants.RecordPublisherType.POLLING.name()))) {
            case EFO:
                return new FanOutRecordPublisherFactory(this.kinesisProxyV2Factory.create(this.configProps));
            case POLLING:
            default:
                return new PollingRecordPublisherFactory(this.kinesisProxyFactory);
        }
    }

    protected RecordPublisher createRecordPublisher(SequenceNumber sequenceNumber, Properties properties, MetricGroup metricGroup, StreamShardHandle streamShardHandle) throws InterruptedException {
        return this.recordPublisherFactory.create(AWSUtil.getStartingPosition(sequenceNumber, properties), properties, metricGroup, streamShardHandle);
    }

    public void runFetcher() throws Exception {
        if (this.running) {
            this.mainThread = Thread.currentThread();
            boolean z = false;
            StringBuilder sb = new StringBuilder();
            for (Map.Entry<String, String> entry : this.subscribedStreamsToLastDiscoveredShardIds.entrySet()) {
                if (entry.getValue() != null) {
                    z = true;
                } else {
                    sb.append(entry.getKey()).append(", ");
                }
            }
            if (sb.length() != 0 && LOG.isWarnEnabled()) {
                LOG.warn("Subtask {} has failed to find any shards for the following subscribed streams: {}", Integer.valueOf(this.indexOfThisConsumerSubtask), sb.toString());
            }
            if (!z) {
                throw new RuntimeException("No shards can be found for all subscribed streams: " + this.streams);
            }
            for (int i = 0; i < this.subscribedShardsState.size(); i++) {
                KinesisStreamShardState kinesisStreamShardState = this.subscribedShardsState.get(i);
                if (!kinesisStreamShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
                    if (LOG.isInfoEnabled()) {
                        LOG.info("Subtask {} will start consuming seeded shard {} from sequence number {} with ShardConsumer {}", new Object[]{Integer.valueOf(this.indexOfThisConsumerSubtask), kinesisStreamShardState.getStreamShardHandle().toString(), kinesisStreamShardState.getLastProcessedSequenceNum(), Integer.valueOf(i)});
                    }
                    StreamShardHandle streamShardHandle = this.subscribedShardsState.get(i).getStreamShardHandle();
                    KinesisDeserializationSchema<T> clonedDeserializationSchema = getClonedDeserializationSchema();
                    clonedDeserializationSchema.open(RuntimeContextInitializationContextAdapters.deserializationAdapter(this.runtimeContext, metricGroup -> {
                        return this.consumerMetricGroup.addGroup("subtaskId", String.valueOf(this.indexOfThisConsumerSubtask)).addGroup(KinesisConsumerMetricConstants.SHARD_METRICS_GROUP, streamShardHandle.getShard().getShardId()).addGroup("user");
                    }));
                    this.shardConsumersExecutor.submit(createShardConsumer(Integer.valueOf(i), streamShardHandle, this.subscribedShardsState.get(i).getLastProcessedSequenceNum(), registerShardMetricGroup(this.consumerMetricGroup, this.subscribedShardsState.get(i)), clonedDeserializationSchema));
                }
            }
            if (this.periodicWatermarkAssigner != null) {
                long autoWatermarkInterval = this.runtimeContext.getExecutionConfig().getAutoWatermarkInterval();
                if (autoWatermarkInterval > 0) {
                    ProcessingTimeService processingTimeService = this.runtimeContext.getProcessingTimeService();
                    LOG.info("Starting periodic watermark emitter with interval {}", Long.valueOf(autoWatermarkInterval));
                    new PeriodicWatermarkEmitter(processingTimeService, autoWatermarkInterval).start();
                    if (this.watermarkTracker != null) {
                        long parseLong = Long.parseLong(getConsumerConfiguration().getProperty(ConsumerConfigConstants.WATERMARK_SYNC_MILLIS, Long.toString(ConsumerConfigConstants.DEFAULT_WATERMARK_SYNC_MILLIS)));
                        this.watermarkTracker.setUpdateTimeoutMillis(parseLong * 3);
                        this.watermarkTracker.open(this.runtimeContext);
                        new WatermarkSyncCallback(this, processingTimeService, parseLong).start();
                        this.recordEmitter.setMaxLookaheadMillis(Math.max(Long.parseLong(getConsumerConfiguration().getProperty(ConsumerConfigConstants.WATERMARK_LOOKAHEAD_MILLIS, Long.toString(0L))), parseLong * 3));
                        Thread thread = new Thread(new Runnable() { // from class: org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.1
                            AnonymousClass1() {
                            }

                            @Override // java.lang.Runnable
                            public void run() {
                                try {
                                    KinesisDataFetcher.this.recordEmitter.run();
                                } catch (Throwable th) {
                                    KinesisDataFetcher.this.stopWithError(th);
                                }
                            }
                        });
                        thread.setName("recordEmitter-" + this.runtimeContext.getTaskNameWithSubtasks());
                        thread.setDaemon(true);
                        thread.start();
                    }
                }
                this.shardIdleIntervalMillis = Long.parseLong(getConsumerConfiguration().getProperty(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS, Long.toString(-1L)));
            }
            long parseLong2 = Long.parseLong(this.configProps.getProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, Long.toString(10000L)));
            if (this.numberOfActiveShards.get() == 0) {
                LOG.info("Subtask {} has no active shards to read on startup; marking the subtask as temporarily idle ...", Integer.valueOf(this.indexOfThisConsumerSubtask));
                this.sourceContext.markAsTemporarilyIdle();
            }
            while (this.running) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Subtask {} is trying to discover new shards that were created due to resharding ...", Integer.valueOf(this.indexOfThisConsumerSubtask));
                }
                for (StreamShardHandle streamShardHandle2 : discoverNewShardsToSubscribe()) {
                    KinesisStreamShardState kinesisStreamShardState2 = new KinesisStreamShardState(convertToStreamShardMetadata(streamShardHandle2), streamShardHandle2, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
                    int registerNewSubscribedShardState = registerNewSubscribedShardState(kinesisStreamShardState2);
                    if (LOG.isInfoEnabled()) {
                        LOG.info("Subtask {} has discovered a new shard {} due to resharding, and will start consuming the shard from sequence number {} with ShardConsumer {}", new Object[]{Integer.valueOf(this.indexOfThisConsumerSubtask), kinesisStreamShardState2.getStreamShardHandle().toString(), kinesisStreamShardState2.getLastProcessedSequenceNum(), Integer.valueOf(registerNewSubscribedShardState)});
                    }
                    StreamShardHandle streamShardHandle3 = kinesisStreamShardState2.getStreamShardHandle();
                    KinesisDeserializationSchema<T> clonedDeserializationSchema2 = getClonedDeserializationSchema();
                    clonedDeserializationSchema2.open(RuntimeContextInitializationContextAdapters.deserializationAdapter(this.runtimeContext, metricGroup2 -> {
                        return this.consumerMetricGroup.addGroup("subtaskId", String.valueOf(this.indexOfThisConsumerSubtask)).addGroup(KinesisConsumerMetricConstants.SHARD_METRICS_GROUP, streamShardHandle3.getShard().getShardId()).addGroup("user");
                    }));
                    this.shardConsumersExecutor.submit(createShardConsumer(Integer.valueOf(registerNewSubscribedShardState), kinesisStreamShardState2.getStreamShardHandle(), kinesisStreamShardState2.getLastProcessedSequenceNum(), registerShardMetricGroup(this.consumerMetricGroup, kinesisStreamShardState2), clonedDeserializationSchema2));
                }
                if (this.running && parseLong2 != 0) {
                    try {
                        Thread.sleep(parseLong2);
                    } catch (InterruptedException e) {
                    }
                }
            }
            try {
                awaitTermination();
            } catch (InterruptedException e2) {
                this.error.compareAndSet(null, e2);
            }
            Throwable th = this.error.get();
            if (th != null) {
                if (th instanceof Exception) {
                    throw ((Exception) th);
                }
                if (!(th instanceof Error)) {
                    throw new Exception(th);
                }
                throw ((Error) th);
            }
        }
    }

    public HashMap<StreamShardMetadata, SequenceNumber> snapshotState() {
        if (!$assertionsDisabled && !Thread.holdsLock(this.checkpointLock)) {
            throw new AssertionError();
        }
        HashMap<StreamShardMetadata, SequenceNumber> hashMap = new HashMap<>();
        for (KinesisStreamShardState kinesisStreamShardState : this.subscribedShardsState) {
            hashMap.put(kinesisStreamShardState.getStreamShardMetadata(), kinesisStreamShardState.getLastProcessedSequenceNum());
        }
        return hashMap;
    }

    public void shutdownFetcher() {
        this.running = false;
        StreamConsumerRegistrarUtil.deregisterStreamConsumers(this.configProps, this.streams);
        this.recordPublisherFactory.close();
        this.shardConsumersExecutor.shutdownNow();
        if (this.mainThread != null) {
            this.mainThread.interrupt();
        }
        if (this.watermarkTracker != null) {
            this.watermarkTracker.close();
        }
        this.recordEmitter.stop();
        if (LOG.isInfoEnabled()) {
            LOG.info("Shutting down the shard consumer threads of subtask {} ...", Integer.valueOf(this.indexOfThisConsumerSubtask));
        }
    }

    public void awaitTermination() throws InterruptedException {
        do {
        } while (!this.shardConsumersExecutor.awaitTermination(1L, TimeUnit.MINUTES));
    }

    public void stopWithError(Throwable th) {
        if (this.error.compareAndSet(null, th)) {
            shutdownFetcher();
        }
    }

    public void advanceLastDiscoveredShardOfStream(String str, String str2) {
        String str3 = this.subscribedStreamsToLastDiscoveredShardIds.get(str);
        if (str3 == null) {
            this.subscribedStreamsToLastDiscoveredShardIds.put(str, str2);
        } else if (shouldAdvanceLastDiscoveredShardId(str2, str3)) {
            this.subscribedStreamsToLastDiscoveredShardIds.put(str, str2);
        }
    }

    protected boolean shouldAdvanceLastDiscoveredShardId(String str, String str2) {
        return StreamShardHandle.compareShardIds(str, str2) > 0;
    }

    public List<StreamShardHandle> discoverNewShardsToSubscribe() throws InterruptedException {
        LinkedList linkedList = new LinkedList();
        GetShardListResult shardList = this.kinesis.getShardList(this.subscribedStreamsToLastDiscoveredShardIds);
        if (shardList.hasRetrievedShards()) {
            for (String str : shardList.getStreamsWithRetrievedShards()) {
                for (StreamShardHandle streamShardHandle : shardList.getRetrievedShardListOfStream(str)) {
                    if (isThisSubtaskShouldSubscribeTo(this.shardAssigner.assign(streamShardHandle, this.totalNumberOfConsumerSubtasks), this.totalNumberOfConsumerSubtasks, this.indexOfThisConsumerSubtask)) {
                        linkedList.add(streamShardHandle);
                    }
                }
                advanceLastDiscoveredShardOfStream(str, shardList.getLastSeenShardOfStream(str).getShard().getShardId());
            }
        }
        return linkedList;
    }

    public Properties getConsumerConfiguration() {
        return this.configProps;
    }

    private KinesisDeserializationSchema<T> getClonedDeserializationSchema() {
        try {
            return (KinesisDeserializationSchema) InstantiationUtil.clone(this.deserializationSchema, this.runtimeContext.getUserCodeClassLoader());
        } catch (IOException | ClassNotFoundException e) {
            throw new RuntimeException(e);
        }
    }

    public void emitRecordAndUpdateState(T t, long j, int i, SequenceNumber sequenceNumber) {
        ShardWatermarkState shardWatermarkState = this.shardWatermarks.get(Integer.valueOf(i));
        Preconditions.checkNotNull(shardWatermarkState, "shard watermark state initialized in registerNewSubscribedShardState");
        Watermark watermark = null;
        if (shardWatermarkState.periodicWatermarkAssigner != null) {
            j = shardWatermarkState.periodicWatermarkAssigner.extractTimestamp(t, shardWatermarkState.lastRecordTimestamp);
            watermark = shardWatermarkState.periodicWatermarkAssigner.getCurrentWatermark();
        }
        ShardWatermarkState.access$602(shardWatermarkState, j);
        ShardWatermarkState.access$702(shardWatermarkState, getCurrentTimeMillis());
        RecordWrapper recordWrapper = new RecordWrapper(t, j);
        recordWrapper.shardStateIndex = i;
        recordWrapper.lastSequenceNumber = sequenceNumber;
        recordWrapper.watermark = watermark;
        try {
            shardWatermarkState.emitQueue.put(recordWrapper);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public void emitRecordAndUpdateState(RecordWrapper<T> recordWrapper) {
        synchronized (this.checkpointLock) {
            if (recordWrapper.getValue() != null) {
                this.sourceContext.collectWithTimestamp(recordWrapper.getValue(), recordWrapper.timestamp);
                this.shardWatermarks.get(Integer.valueOf(recordWrapper.shardStateIndex)).lastEmittedRecordWatermark = recordWrapper.watermark;
            } else {
                LOG.warn("Skipping non-deserializable record at sequence number {} of shard {}.", recordWrapper.lastSequenceNumber, this.subscribedShardsState.get(recordWrapper.shardStateIndex).getStreamShardHandle());
            }
            updateState(recordWrapper.shardStateIndex, recordWrapper.lastSequenceNumber);
        }
    }

    public final void updateState(int i, SequenceNumber sequenceNumber) {
        synchronized (this.checkpointLock) {
            this.subscribedShardsState.get(i).setLastProcessedSequenceNum(sequenceNumber);
            if (sequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
                LOG.info("Subtask {} has reached the end of subscribed shard: {}", Integer.valueOf(this.indexOfThisConsumerSubtask), this.subscribedShardsState.get(i).getStreamShardHandle());
                if (this.numberOfActiveShards.decrementAndGet() == 0) {
                    LOG.info("Subtask {} has reached the end of all currently subscribed shards; marking the subtask as temporarily idle ...", Integer.valueOf(this.indexOfThisConsumerSubtask));
                    this.sourceContext.markAsTemporarilyIdle();
                }
            }
        }
    }

    public int registerNewSubscribedShardState(KinesisStreamShardState kinesisStreamShardState) {
        int size;
        synchronized (this.checkpointLock) {
            this.subscribedShardsState.add(kinesisStreamShardState);
            if (!kinesisStreamShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
                this.numberOfActiveShards.incrementAndGet();
            }
            size = this.subscribedShardsState.size() - 1;
            if (this.shardWatermarks.get(Integer.valueOf(size)) == null) {
                ShardWatermarkState shardWatermarkState = new ShardWatermarkState(null);
                try {
                    shardWatermarkState.periodicWatermarkAssigner = InstantiationUtil.clone(this.periodicWatermarkAssigner);
                    shardWatermarkState.emitQueue = this.recordEmitter.getQueue(size);
                    ShardWatermarkState.access$702(shardWatermarkState, getCurrentTimeMillis());
                    ShardWatermarkState.access$602(shardWatermarkState, Long.MIN_VALUE);
                    this.shardWatermarks.put(Integer.valueOf(size), shardWatermarkState);
                } catch (Exception e) {
                    throw new RuntimeException("Failed to instantiate new WatermarkAssigner", e);
                }
            }
        }
        return size;
    }

    @VisibleForTesting
    protected long getCurrentTimeMillis() {
        return System.currentTimeMillis();
    }

    @VisibleForTesting
    protected void emitWatermark() {
        LOG.debug("Evaluating watermark for subtask {} time {}", Integer.valueOf(this.indexOfThisConsumerSubtask), Long.valueOf(getCurrentTimeMillis()));
        long j = Long.MAX_VALUE;
        long j2 = Long.MAX_VALUE;
        long currentTimeMillis = this.shardIdleIntervalMillis > 0 ? getCurrentTimeMillis() - this.shardIdleIntervalMillis : Long.MAX_VALUE;
        for (Map.Entry<Integer, ShardWatermarkState> entry : this.shardWatermarks.entrySet()) {
            Watermark watermark = entry.getValue().lastEmittedRecordWatermark;
            if (watermark != null && (entry.getValue().lastUpdated >= currentTimeMillis || entry.getValue().emitQueue.getSize() > 0 || watermark.getTimestamp() > this.lastWatermark)) {
                j = Math.min(j, watermark.getTimestamp());
                RecordWrapper recordWrapper = (RecordWrapper) entry.getValue().emitQueue.peek();
                j2 = Math.min(j2, (recordWrapper != null ? recordWrapper.watermark : watermark).getTimestamp());
            }
        }
        if (j == Long.MAX_VALUE) {
            if (this.shardWatermarks.isEmpty() || this.shardIdleIntervalMillis > 0) {
                LOG.info("No active shard for subtask {}, marking the source idle.", Integer.valueOf(this.indexOfThisConsumerSubtask));
                this.sourceContext.markAsTemporarilyIdle();
                this.isIdle = true;
                return;
            }
            return;
        }
        if (j > this.lastWatermark) {
            LOG.debug("Emitting watermark {} from subtask {}", Long.valueOf(j), Integer.valueOf(this.indexOfThisConsumerSubtask));
            this.sourceContext.emitWatermark(new Watermark(j));
            this.lastWatermark = j;
            this.isIdle = false;
        }
        this.nextWatermark = j2;
    }

    private MetricGroup registerShardMetricGroup(MetricGroup metricGroup, KinesisStreamShardState kinesisStreamShardState) {
        return metricGroup.addGroup(KinesisConsumerMetricConstants.STREAM_METRICS_GROUP, kinesisStreamShardState.getStreamShardHandle().getStreamName()).addGroup(KinesisConsumerMetricConstants.SHARD_METRICS_GROUP, kinesisStreamShardState.getStreamShardHandle().getShard().getShardId());
    }

    public static boolean isThisSubtaskShouldSubscribeTo(int i, int i2, int i3) {
        return Math.abs(i % i2) == i3;
    }

    @VisibleForTesting
    protected ExecutorService createShardConsumersThreadPool(String str) {
        return Executors.newCachedThreadPool(new ThreadFactory() { // from class: org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.2
            private final AtomicLong threadCount = new AtomicLong(0);
            final /* synthetic */ String val$subtaskName;

            AnonymousClass2(String str2) {
                r8 = str2;
            }

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable);
                thread.setName("shardConsumers-" + r8 + "-thread-" + this.threadCount.getAndIncrement());
                thread.setDaemon(true);
                return thread;
            }
        });
    }

    @VisibleForTesting
    public List<KinesisStreamShardState> getSubscribedShardsState() {
        return this.subscribedShardsState;
    }

    public static HashMap<String, String> createInitialSubscribedStreamsToLastDiscoveredShardsState(List<String> list) {
        HashMap<String, String> hashMap = new HashMap<>();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), null);
        }
        return hashMap;
    }

    public static StreamShardMetadata convertToStreamShardMetadata(StreamShardHandle streamShardHandle) {
        StreamShardMetadata streamShardMetadata = new StreamShardMetadata();
        streamShardMetadata.setStreamName(streamShardHandle.getStreamName());
        streamShardMetadata.setShardId(streamShardHandle.getShard().getShardId());
        streamShardMetadata.setParentShardId(streamShardHandle.getShard().getParentShardId());
        streamShardMetadata.setAdjacentParentShardId(streamShardHandle.getShard().getAdjacentParentShardId());
        if (streamShardHandle.getShard().getHashKeyRange() != null) {
            streamShardMetadata.setStartingHashKey(streamShardHandle.getShard().getHashKeyRange().getStartingHashKey());
            streamShardMetadata.setEndingHashKey(streamShardHandle.getShard().getHashKeyRange().getEndingHashKey());
        }
        if (streamShardHandle.getShard().getSequenceNumberRange() != null) {
            streamShardMetadata.setStartingSequenceNumber(streamShardHandle.getShard().getSequenceNumberRange().getStartingSequenceNumber());
            streamShardMetadata.setEndingSequenceNumber(streamShardHandle.getShard().getSequenceNumberRange().getEndingSequenceNumber());
        }
        return streamShardMetadata;
    }

    public static StreamShardHandle convertToStreamShardHandle(StreamShardMetadata streamShardMetadata) {
        Shard shard = new Shard();
        shard.withShardId(streamShardMetadata.getShardId());
        shard.withParentShardId(streamShardMetadata.getParentShardId());
        shard.withAdjacentParentShardId(streamShardMetadata.getAdjacentParentShardId());
        HashKeyRange hashKeyRange = new HashKeyRange();
        hashKeyRange.withStartingHashKey(streamShardMetadata.getStartingHashKey());
        hashKeyRange.withEndingHashKey(streamShardMetadata.getEndingHashKey());
        shard.withHashKeyRange(hashKeyRange);
        SequenceNumberRange sequenceNumberRange = new SequenceNumberRange();
        sequenceNumberRange.withStartingSequenceNumber(streamShardMetadata.getStartingSequenceNumber());
        sequenceNumberRange.withEndingSequenceNumber(streamShardMetadata.getEndingSequenceNumber());
        shard.withSequenceNumberRange(sequenceNumberRange);
        return new StreamShardHandle(streamShardMetadata.getStreamName(), shard);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2019220112:
                if (implMethodName.equals("lambda$static$d4637477$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/streaming/connectors/kinesis/KinesisShardAssigner") && serializedLambda.getFunctionalInterfaceMethodName().equals("assign") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle;I)I") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle;I)I")) {
                    return (streamShardHandle, i) -> {
                        return streamShardHandle.hashCode();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static {
        $assertionsDisabled = !KinesisDataFetcher.class.desiredAssertionStatus();
        DEFAULT_SHARD_ASSIGNER = (streamShardHandle, i) -> {
            return streamShardHandle.hashCode();
        };
        LOG = LoggerFactory.getLogger(KinesisDataFetcher.class);
    }
}
