/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.kinesis.impl.source;

import com.amazonaws.SdkClientException;
import com.amazonaws.services.kinesis.AmazonKinesisAsync;
import com.amazonaws.services.kinesis.model.ListShardsRequest;
import com.amazonaws.services.kinesis.model.ListShardsResult;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardFilter;
import com.amazonaws.services.kinesis.model.ShardFilterType;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.kinesis.impl.AbstractShardWorker;
import com.hazelcast.jet.kinesis.impl.KinesisUtil;
import com.hazelcast.jet.kinesis.impl.RandomizedRateTracker;
import com.hazelcast.jet.kinesis.impl.RetryTracker;
import com.hazelcast.jet.kinesis.impl.source.HashRange;
import com.hazelcast.jet.kinesis.impl.source.ShardQueue;
import com.hazelcast.jet.kinesis.impl.source.ShardTracker;
import com.hazelcast.jet.retry.RetryStrategy;
import com.hazelcast.logging.ILogger;
import java.math.BigInteger;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class RangeMonitor
extends AbstractShardWorker {
    private static final int SHARD_LISTINGS_ALLOWED_PER_SECOND = 100;
    private static final double RATIO_OF_SHARD_LISTING_RATE_UTILIZED = 0.1;
    private final ShardTracker shardTracker;
    private final HashRange memberHashRange;
    private final ShardQueue[] shardQueues;
    private final RandomizedRateTracker listShardsRateTracker;
    private final RetryTracker listShardRetryTracker;
    private String nextToken;
    private Future<ListShardsResult> listShardsResult;
    private long nextListShardsTimeMs;

    public RangeMonitor(int totalInstances, AmazonKinesisAsync kinesis, String stream, HashRange memberHashRange, HashRange[] rangePartitions, ShardQueue[] shardQueues, RetryStrategy retryStrategy, ILogger logger) {
        super(kinesis, stream, logger);
        this.memberHashRange = memberHashRange;
        this.shardTracker = new ShardTracker(rangePartitions);
        this.shardQueues = shardQueues;
        this.listShardRetryTracker = new RetryTracker(retryStrategy);
        this.listShardsRateTracker = RangeMonitor.initRandomizedTracker(totalInstances);
        this.nextListShardsTimeMs = System.currentTimeMillis();
    }

    public void run() {
        long currentTimeMs = System.currentTimeMillis();
        if (this.listShardsResult == null) {
            this.initShardListing(currentTimeMs);
        } else if (this.listShardsResult.isDone()) {
            ListShardsResult result;
            try {
                result = KinesisUtil.readResult(this.listShardsResult);
            }
            catch (SdkClientException e) {
                this.dealWithListShardsFailure((Exception)((Object)e));
                return;
            }
            catch (Throwable t) {
                throw ExceptionUtil.rethrow((Throwable)t);
            }
            finally {
                this.listShardsResult = null;
            }
            this.listShardRetryTracker.reset();
            this.checkForNewShards(currentTimeMs, result);
            this.nextToken = result.getNextToken();
            if (this.nextToken == null) {
                this.checkForExpiredShards(currentTimeMs);
            }
        }
    }

    private void initShardListing(long currentTimeMs) {
        if (currentTimeMs < this.nextListShardsTimeMs) {
            return;
        }
        this.listShardsResult = this.listAllShardsAsync(this.nextToken);
        this.nextListShardsTimeMs = currentTimeMs + this.listShardsRateTracker.next();
    }

    private Future<ListShardsResult> listAllShardsAsync(String nextToken) {
        ShardFilterType filterType = ShardFilterType.FROM_TRIM_HORIZON;
        ListShardsRequest request = RangeMonitor.listAllShardsRequest(this.streamName, nextToken, filterType);
        return this.kinesis.listShardsAsync(request);
    }

    public static ListShardsRequest listAllShardsRequest(String stream, @Nullable String nextToken, ShardFilterType filterType) {
        ListShardsRequest request = new ListShardsRequest();
        if (nextToken == null) {
            request.setStreamName(stream);
        } else {
            request.setNextToken(nextToken);
        }
        request.setShardFilter(new ShardFilter().withType(filterType));
        return request;
    }

    private void checkForNewShards(long currentTimeMs, ListShardsResult result) {
        Set<Shard> shards = result.getShards().stream().filter(shard -> KinesisUtil.shardBelongsToRange(shard, this.memberHashRange)).collect(Collectors.toSet());
        Map<Shard, Integer> newShards = this.shardTracker.markDetections(shards, currentTimeMs);
        if (!newShards.isEmpty()) {
            this.logger.info("New shards detected: " + newShards.keySet().stream().map(Shard::getShardId).collect(Collectors.joining(", ")));
            for (Map.Entry<Shard, Integer> e : newShards.entrySet()) {
                Shard shard2 = e.getKey();
                int owner = e.getValue();
                this.shardQueues[owner].addAdded(shard2);
            }
        }
    }

    private void checkForExpiredShards(long currentTimeMs) {
        Map<String, Integer> expiredShards = this.shardTracker.removeExpiredShards(currentTimeMs);
        for (Map.Entry<String, Integer> e : expiredShards.entrySet()) {
            String shardId = e.getKey();
            int owner = e.getValue();
            this.logger.info("Expired shard detected: " + shardId);
            this.shardQueues[owner].addExpired(shardId);
        }
    }

    public void addKnownShard(String shardId, BigInteger startingHashKey) {
        this.shardTracker.addUndetected(shardId, startingHashKey, System.currentTimeMillis());
    }

    private void dealWithListShardsFailure(@Nonnull Exception failure) {
        this.nextToken = null;
        this.listShardRetryTracker.attemptFailed();
        if (!this.listShardRetryTracker.shouldTryAgain()) {
            throw ExceptionUtil.rethrow((Throwable)failure);
        }
        long timeoutMillis = this.listShardRetryTracker.getNextWaitTimeMillis();
        this.logger.warning(String.format("Failed listing shards, retrying in %d ms. Cause: %s", timeoutMillis, failure.getMessage()));
        this.nextListShardsTimeMs = System.currentTimeMillis() + timeoutMillis;
    }

    @Nonnull
    private static RandomizedRateTracker initRandomizedTracker(int totalInstances) {
        return new RandomizedRateTracker(TimeUnit.SECONDS.toMillis(1L) * (long)totalInstances, 10);
    }
}

