/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.kinesis.impl.source;

import com.amazonaws.services.kinesis.AmazonKinesisAsync;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.internal.metrics.DynamicMetricsProvider;
import com.hazelcast.internal.metrics.MetricDescriptor;
import com.hazelcast.internal.metrics.MetricsCollectionContext;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.EventTimeMapper;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.kinesis.impl.KinesisUtil;
import com.hazelcast.jet.kinesis.impl.source.HashRange;
import com.hazelcast.jet.kinesis.impl.source.InitialShardIterators;
import com.hazelcast.jet.kinesis.impl.source.KinesisDataSerializerHook;
import com.hazelcast.jet.kinesis.impl.source.RangeMonitor;
import com.hazelcast.jet.kinesis.impl.source.ShardQueue;
import com.hazelcast.jet.kinesis.impl.source.ShardReader;
import com.hazelcast.jet.retry.RetryStrategy;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class KinesisSourceP<T>
extends AbstractProcessor
implements DynamicMetricsProvider {
    @Nonnull
    private final AmazonKinesisAsync kinesis;
    @Nonnull
    private final String stream;
    @Nonnull
    private final EventTimeMapper<? super T> eventTimeMapper;
    @Nonnull
    private final HashRange memberHashRange;
    @Nonnull
    private final HashRange processorHashRange;
    @Nonnull
    private final ShardStates shardStates = new ShardStates();
    @Nonnull
    private final ShardQueue shardQueue;
    @Nullable
    private final RangeMonitor monitor;
    @Nonnull
    private final List<ShardReader> shardReaders = new ArrayList<ShardReader>();
    @Nonnull
    private final RetryStrategy retryStrategy;
    @Nonnull
    private final InitialShardIterators initialShardIterators;
    private final BiFunctionEx<? super Record, ? super Shard, ? extends T> projectionFn;
    private int id;
    private ILogger logger;
    private Traverser<Object> traverser = Traversers.empty();
    private Traverser<Map.Entry<BroadcastKey<String>, ShardState>> snapshotTraverser;
    private int nextReader;

    public KinesisSourceP(@Nonnull AmazonKinesisAsync kinesis, @Nonnull String stream, @Nonnull EventTimePolicy<? super T> eventTimePolicy, @Nonnull HashRange memberHashRange, @Nonnull HashRange processorHashRange, @Nonnull ShardQueue shardQueue, @Nullable RangeMonitor monitor, @Nonnull RetryStrategy retryStrategy, @Nonnull InitialShardIterators initialShardIterators, BiFunctionEx<? super Record, ? super Shard, ? extends T> projectionFn) {
        this.kinesis = Objects.requireNonNull(kinesis, "kinesis");
        this.stream = Objects.requireNonNull(stream, "stream");
        this.eventTimeMapper = new EventTimeMapper(eventTimePolicy);
        this.memberHashRange = Objects.requireNonNull(memberHashRange, "memberHashRange");
        this.processorHashRange = Objects.requireNonNull(processorHashRange, "processorHashRange");
        this.shardQueue = shardQueue;
        this.monitor = monitor;
        this.retryStrategy = retryStrategy;
        this.initialShardIterators = initialShardIterators;
        this.projectionFn = projectionFn;
    }

    protected void init(@Nonnull Processor.Context context) throws Exception {
        super.init(context);
        this.logger = context.logger();
        this.id = context.globalProcessorIndex();
        if (this.logger.isFineEnabled()) {
            this.logger.fine("Processor " + this.id + " handles " + this.processorHashRange);
        }
    }

    public boolean complete() {
        if (!this.emitFromTraverser(this.traverser)) {
            return false;
        }
        this.runMonitor();
        this.checkForNewShards();
        this.runReaders();
        return false;
    }

    private void runMonitor() {
        if (this.monitor != null) {
            this.monitor.run();
        }
    }

    private void checkForNewShards() {
        String shardId = this.shardQueue.pollExpired();
        if (shardId != null) {
            this.shardStates.remove(shardId);
            return;
        }
        Shard shard = this.shardQueue.pollAdded();
        if (shard != null) {
            this.addShardReader(shard);
        }
    }

    private void runReaders() {
        if (!this.shardReaders.isEmpty()) {
            long currentTime = System.nanoTime();
            for (int i = 0; i < this.shardReaders.size(); ++i) {
                int currentReader = this.nextReader;
                ShardReader reader = this.shardReaders.get(currentReader);
                this.nextReader = KinesisSourceP.incrementCircular(currentReader, this.shardReaders.size());
                ShardReader.Result result = reader.probe(currentTime);
                if (ShardReader.Result.HAS_DATA.equals((Object)result)) {
                    Shard shard = reader.getShard();
                    this.traverser = reader.clearData().flatMap(record -> {
                        Object payload = this.projectionFn.apply(record, (Object)shard);
                        return this.eventTimeMapper.flatMapEvent(payload, currentReader, record.getApproximateArrivalTimestamp().getTime());
                    });
                    Long watermark = this.eventTimeMapper.getWatermark(currentReader);
                    watermark = watermark < 0L ? null : watermark;
                    this.shardStates.update(shard, reader.getLastSeenSeqNo(), watermark);
                    this.emitFromTraverser(this.traverser);
                    return;
                }
                if (!ShardReader.Result.CLOSED.equals((Object)result)) continue;
                Shard shard = reader.getShard();
                this.logger.info("Shard " + shard.getShardId() + " of stream " + this.stream + " closed");
                this.shardStates.close(shard);
                this.nextReader = 0;
                this.traverser = this.removeShardReader(currentReader);
                this.emitFromTraverser(this.traverser);
                return;
            }
        }
        this.traverser = this.eventTimeMapper.flatMapIdle();
        this.emitFromTraverser(this.traverser);
    }

    public boolean saveToSnapshot() {
        if (!this.emitFromTraverser(this.traverser)) {
            return false;
        }
        if (this.snapshotTraverser == null) {
            this.snapshotTraverser = Traversers.traverseStream(this.shardStates.snapshotEntries()).onFirstNull(() -> {
                this.snapshotTraverser = null;
                if (this.getLogger().isFinestEnabled()) {
                    this.getLogger().finest("Finished saving snapshot. Saved shard states: " + this.shardStates);
                }
            });
        }
        return this.emitFromTraverserToSnapshot(this.snapshotTraverser);
    }

    protected void restoreFromSnapshot(@Nonnull Object key, @Nonnull Object value) {
        ShardState state = (ShardState)value;
        String shardId = state.getShardId();
        BigInteger startingHashKey = state.getStartingHashKey();
        if (KinesisUtil.shardBelongsToRange(startingHashKey, this.processorHashRange)) {
            boolean closed = state.isClosed();
            String seqNo = state.getLastSeenSeqNo();
            Long watermark = state.getWatermark();
            this.shardStates.update(shardId, startingHashKey, closed, seqNo, watermark);
        }
        if (this.monitor != null && KinesisUtil.shardBelongsToRange(startingHashKey, this.memberHashRange)) {
            this.monitor.addKnownShard(shardId, startingHashKey);
        }
    }

    private void addShardReader(Shard shard) {
        String shardId = shard.getShardId();
        ShardState shardState = this.shardStates.get(shardId);
        if (!shardState.isClosed()) {
            int readerIndex = this.shardReaders.size();
            String lastSeenSeqNo = shardState.getLastSeenSeqNo();
            this.shardReaders.add(this.initShardReader(shard, lastSeenSeqNo));
            this.eventTimeMapper.addPartitions(1);
            Long watermark = shardState.getWatermark();
            if (watermark != null) {
                this.eventTimeMapper.restoreWatermark(readerIndex, watermark.longValue());
            }
        }
    }

    private Traverser<Object> removeShardReader(int index) {
        this.shardReaders.remove(index);
        return this.eventTimeMapper.removePartition(index);
    }

    @Nonnull
    private ShardReader initShardReader(Shard shard, String lastSeenSeqNo) {
        this.logger.info("Shard " + shard.getShardId() + " of stream " + this.stream + " assigned to processor instance " + this.id);
        return new ShardReader(this.kinesis, this.stream, shard, lastSeenSeqNo, this.retryStrategy, this.initialShardIterators, this.logger);
    }

    public void provideDynamicMetrics(MetricDescriptor descriptor, MetricsCollectionContext context) {
        for (ShardReader shardReader : this.shardReaders) {
            shardReader.provideDynamicMetrics(descriptor.copy(), context);
        }
    }

    public boolean closeIsCooperative() {
        return true;
    }

    private static int incrementCircular(int v, int limit) {
        if (++v == limit) {
            v = 0;
        }
        return v;
    }

    public static final class ShardState
    implements IdentifiedDataSerializable {
        public static final ShardState EMPTY = new ShardState();
        private String shardId;
        private BigInteger startingHashKey;
        private boolean closed;
        private String lastSeenSeqNo;
        private Long watermark;

        public ShardState() {
        }

        public ShardState(String shardId) {
            this.shardId = shardId;
        }

        public String getShardId() {
            return this.shardId;
        }

        public void setShardId(String shardId) {
            this.shardId = shardId;
        }

        public BigInteger getStartingHashKey() {
            return this.startingHashKey;
        }

        public void setStartingHashKey(BigInteger startingHashKey) {
            this.startingHashKey = startingHashKey;
        }

        public boolean isClosed() {
            return this.closed;
        }

        public void setClosed(boolean closed) {
            this.closed = closed;
        }

        public String getLastSeenSeqNo() {
            return this.lastSeenSeqNo;
        }

        public void setLastSeenSeqNo(String lastSeenSeqNo) {
            this.lastSeenSeqNo = lastSeenSeqNo;
        }

        public Long getWatermark() {
            return this.watermark;
        }

        public void setWatermark(Long watermark) {
            this.watermark = watermark;
        }

        public int getFactoryId() {
            return KinesisDataSerializerHook.FACTORY_ID;
        }

        public int getClassId() {
            return 0;
        }

        public void writeData(ObjectDataOutput out) throws IOException {
            out.writeUTF(this.shardId);
            out.writeObject((Object)this.startingHashKey);
            out.writeBoolean(this.closed);
            out.writeUTF(this.lastSeenSeqNo);
            out.writeObject((Object)this.watermark);
        }

        public void readData(ObjectDataInput in) throws IOException {
            this.shardId = in.readUTF();
            this.startingHashKey = (BigInteger)in.readObject();
            this.closed = in.readBoolean();
            this.lastSeenSeqNo = in.readUTF();
            this.watermark = (Long)in.readObject();
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append(this.shardId).append(": ");
            sb.append("startingHashKey=").append(this.startingHashKey);
            sb.append(", closed=").append(this.closed);
            if (!this.closed) {
                sb.append(", lastSeenSeqNo=").append(this.lastSeenSeqNo);
                if (this.watermark != null) {
                    sb.append(", watermark=").append(com.hazelcast.jet.impl.util.Util.toLocalTime((long)this.watermark));
                }
            }
            return sb.toString();
        }
    }

    private static class ShardStates {
        private final Map<String, ShardState> states = new HashMap<String, ShardState>();

        private ShardStates() {
        }

        void update(Shard shard, String seqNo, Long watermark) {
            BigInteger startingHashKey = new BigInteger(shard.getHashKeyRange().getStartingHashKey());
            this.update(shard.getShardId(), startingHashKey, false, seqNo, watermark);
        }

        void close(Shard shard) {
            BigInteger startingHashKey = new BigInteger(shard.getHashKeyRange().getStartingHashKey());
            this.update(shard.getShardId(), startingHashKey, true, null, null);
        }

        void update(String shardId, BigInteger startingHashKey, boolean closed, String lastSeenSeqNo, Long watermark) {
            ShardState state = this.states.computeIfAbsent(shardId, ShardState::new);
            state.setStartingHashKey(startingHashKey);
            state.setClosed(closed);
            state.setLastSeenSeqNo(lastSeenSeqNo);
            state.setWatermark(watermark);
        }

        void remove(String shardId) {
            ShardState state = this.states.remove(shardId);
            if (state == null) {
                throw new JetException("Removing insistent state for shard " + shardId + ", shouldn't happen");
            }
        }

        ShardState get(String shardId) {
            return this.states.getOrDefault(shardId, ShardState.EMPTY);
        }

        Stream<Map.Entry<BroadcastKey<String>, ShardState>> snapshotEntries() {
            return this.states.entrySet().stream().map(e -> Util.entry((Object)BroadcastKey.broadcastKey(e.getKey()), e.getValue()));
        }

        public String toString() {
            return this.states.values().stream().map(ShardState::toString).collect(Collectors.joining(", "));
        }
    }
}

