package software.amazon.kinesis.leases;

import java.io.Serializable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

@KinesisClientInternalApi
/* loaded from: input_file:software/amazon/kinesis/leases/HierarchicalShardSyncer.class */
public class HierarchicalShardSyncer {
    private static final Logger log = LoggerFactory.getLogger(HierarchicalShardSyncer.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:software/amazon/kinesis/leases/HierarchicalShardSyncer$StartingSequenceNumberAndShardIdBasedComparator.class */
    public static class StartingSequenceNumberAndShardIdBasedComparator implements Comparator<Lease>, Serializable {
        private static final long serialVersionUID = 1;
        private final Map<String, Shard> shardIdToShardMap;

        @Override // java.util.Comparator
        public int compare(Lease lease, Lease lease2) {
            int i = 0;
            String leaseKey = lease.leaseKey();
            String leaseKey2 = lease2.leaseKey();
            Shard shard = this.shardIdToShardMap.get(leaseKey);
            Shard shard2 = this.shardIdToShardMap.get(leaseKey2);
            if (shard != null && shard2 != null) {
                i = new BigInteger(shard.sequenceNumberRange().startingSequenceNumber()).compareTo(new BigInteger(shard2.sequenceNumberRange().startingSequenceNumber()));
            }
            if (i == 0) {
                i = leaseKey.compareTo(leaseKey2);
            }
            return i;
        }

        public StartingSequenceNumberAndShardIdBasedComparator(Map<String, Shard> map) {
            this.shardIdToShardMap = map;
        }
    }

    public synchronized void checkAndCreateLeaseForNewShards(@NonNull ShardDetector shardDetector, LeaseRefresher leaseRefresher, InitialPositionInStreamExtended initialPositionInStreamExtended, boolean z, boolean z2, MetricsScope metricsScope) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
        if (shardDetector == null) {
            throw new NullPointerException("shardDetector");
        }
        List<Shard> shardList = getShardList(shardDetector);
        log.debug("Num shards: {}", Integer.valueOf(shardList.size()));
        Map<String, Shard> constructShardIdToShardMap = constructShardIdToShardMap(shardList);
        Map<String, Set<String>> constructShardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap(constructShardIdToShardMap);
        Set<String> findInconsistentShardIds = findInconsistentShardIds(constructShardIdToChildShardIdsMap, constructShardIdToShardMap);
        if (!z2) {
            assertAllParentShardsAreClosed(findInconsistentShardIds);
        }
        List<Lease> listLeases = leaseRefresher.listLeases();
        List<Lease> determineNewLeasesToCreate = determineNewLeasesToCreate(shardList, listLeases, initialPositionInStreamExtended, findInconsistentShardIds);
        log.debug("Num new leases to create: {}", Integer.valueOf(determineNewLeasesToCreate.size()));
        for (Lease lease : determineNewLeasesToCreate) {
            long currentTimeMillis = System.currentTimeMillis();
            boolean z3 = false;
            try {
                leaseRefresher.createLeaseIfNotExists(lease);
                z3 = true;
                MetricsUtil.addSuccessAndLatency(metricsScope, "CreateLease", true, currentTimeMillis, MetricsLevel.DETAILED);
            } catch (Throwable th) {
                MetricsUtil.addSuccessAndLatency(metricsScope, "CreateLease", z3, currentTimeMillis, MetricsLevel.DETAILED);
                throw th;
            }
        }
        List<Lease> arrayList = new ArrayList<>(listLeases);
        arrayList.addAll(determineNewLeasesToCreate);
        cleanupGarbageLeases(shardDetector, shardList, arrayList, leaseRefresher);
        if (z) {
            cleanupLeasesOfFinishedShards(listLeases, constructShardIdToShardMap, constructShardIdToChildShardIdsMap, arrayList, leaseRefresher);
        }
    }

    private static void assertAllParentShardsAreClosed(Set<String> set) throws KinesisClientLibIOException {
        if (!CollectionUtils.isNullOrEmpty(set)) {
            throw new KinesisClientLibIOException(String.format("%d open child shards (%s) are inconsistent. This can happen due to a race condition between describeStream and a reshard operation.", Integer.valueOf(set.size()), StringUtils.join(set, ' ')));
        }
    }

    private static Set<String> findInconsistentShardIds(Map<String, Set<String>> map, Map<String, Shard> map2) {
        return (Set) map.entrySet().stream().filter(entry -> {
            return entry.getKey() == null || ((Shard) map2.get(entry.getKey())).sequenceNumberRange().endingSequenceNumber() == null;
        }).flatMap(entry2 -> {
            return ((Set) map.get(entry2.getKey())).stream();
        }).collect(Collectors.toSet());
    }

    synchronized void assertClosedShardsAreCoveredOrAbsent(Map<String, Shard> map, Map<String, Set<String>> map2, Set<String> set) throws KinesisClientLibIOException {
        for (String str : set) {
            Shard shard = map.get(str);
            if (shard == null) {
                log.info("Shard {} is not present in Kinesis anymore.", str);
            } else {
                if (shard.sequenceNumberRange().endingSequenceNumber() == null) {
                    throw new KinesisClientLibIOException("Shard " + set + " is not closed. This can happen if we constructed the list of shards  while a reshard operation was in progress.");
                }
                Set<String> set2 = map2.get(str);
                if (set2 == null) {
                    throw new KinesisClientLibIOException("Incomplete shard list: Closed shard " + str + " has no children.This can happen if we constructed the list of shards  while a reshard operation was in progress.");
                }
                assertHashRangeOfClosedShardIsCovered(shard, map, set2);
            }
        }
    }

    private synchronized void assertHashRangeOfClosedShardIsCovered(Shard shard, Map<String, Shard> map, Set<String> set) throws KinesisClientLibIOException {
        BigInteger bigInteger = null;
        BigInteger bigInteger2 = null;
        BigInteger bigInteger3 = new BigInteger(shard.hashKeyRange().startingHashKey());
        BigInteger bigInteger4 = new BigInteger(shard.hashKeyRange().endingHashKey());
        Iterator<String> it = set.iterator();
        while (it.hasNext()) {
            Shard shard2 = map.get(it.next());
            BigInteger bigInteger5 = new BigInteger(shard2.hashKeyRange().startingHashKey());
            if (bigInteger == null || bigInteger5.compareTo(bigInteger) < 0) {
                bigInteger = bigInteger5;
            }
            BigInteger bigInteger6 = new BigInteger(shard2.hashKeyRange().endingHashKey());
            if (bigInteger2 == null || bigInteger6.compareTo(bigInteger2) > 0) {
                bigInteger2 = bigInteger6;
            }
        }
        if (bigInteger == null || bigInteger2 == null || bigInteger.compareTo(bigInteger3) > 0 || bigInteger2.compareTo(bigInteger4) < 0) {
            throw new KinesisClientLibIOException(String.format("Incomplete shard list: hash key range of shard %s is not covered by its child shards.", shard.shardId()));
        }
    }

    static Map<String, Set<String>> constructShardIdToChildShardIdsMap(Map<String, Shard> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Shard> entry : map.entrySet()) {
            String key = entry.getKey();
            Shard value = entry.getValue();
            String parentShardId = value.parentShardId();
            if (parentShardId != null && map.containsKey(parentShardId)) {
                ((Set) hashMap.computeIfAbsent(parentShardId, str -> {
                    return new HashSet();
                })).add(key);
            }
            String adjacentParentShardId = value.adjacentParentShardId();
            if (adjacentParentShardId != null && map.containsKey(adjacentParentShardId)) {
                ((Set) hashMap.computeIfAbsent(adjacentParentShardId, str2 -> {
                    return new HashSet();
                })).add(key);
            }
        }
        return hashMap;
    }

    private static List<Shard> getShardList(@NonNull ShardDetector shardDetector) throws KinesisClientLibIOException {
        if (shardDetector == null) {
            throw new NullPointerException("shardDetector");
        }
        List<Shard> listShards = shardDetector.listShards();
        if (listShards == null) {
            throw new KinesisClientLibIOException("Stream is not in ACTIVE OR UPDATING state - will retry getting the shard list.");
        }
        return listShards;
    }

    static List<Lease> determineNewLeasesToCreate(List<Shard> list, List<Lease> list2, InitialPositionInStreamExtended initialPositionInStreamExtended, Set<String> set) {
        HashMap hashMap = new HashMap();
        Map<String, Shard> constructShardIdToShardMap = constructShardIdToShardMap(list);
        Set set2 = (Set) list2.stream().peek(lease -> {
            log.debug("Existing lease: {}", lease);
        }).map((v0) -> {
            return v0.leaseKey();
        }).collect(Collectors.toSet());
        List<Shard> openShards = getOpenShards(list);
        HashMap hashMap2 = new HashMap();
        for (Shard shard : openShards) {
            String shardId = shard.shardId();
            log.debug("Evaluating leases for open shard {} and its ancestors.", shardId);
            if (set2.contains(shardId)) {
                log.debug("Lease for shardId {} already exists. Not creating a lease", shardId);
            } else if (set.contains(shardId)) {
                log.info("shardId {} is an inconsistent child.  Not creating a lease", shardId);
            } else {
                log.debug("Need to create a lease for shardId {}", shardId);
                Lease newKCLLease = newKCLLease(shard);
                if (!checkIfDescendantAndAddNewLeasesForAncestors(shardId, initialPositionInStreamExtended, set2, constructShardIdToShardMap, hashMap, hashMap2) || initialPositionInStreamExtended.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
                    newKCLLease.checkpoint(convertToCheckpoint(initialPositionInStreamExtended));
                } else {
                    newKCLLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
                }
                log.debug("Set checkpoint of {} to {}", newKCLLease.leaseKey(), newKCLLease.checkpoint());
                hashMap.put(shardId, newKCLLease);
            }
        }
        ArrayList arrayList = new ArrayList(hashMap.values());
        arrayList.sort(new StartingSequenceNumberAndShardIdBasedComparator(constructShardIdToShardMap));
        return arrayList;
    }

    static List<Lease> determineNewLeasesToCreate(List<Shard> list, List<Lease> list2, InitialPositionInStreamExtended initialPositionInStreamExtended) {
        return determineNewLeasesToCreate(list, list2, initialPositionInStreamExtended, new HashSet());
    }

    static boolean checkIfDescendantAndAddNewLeasesForAncestors(String str, InitialPositionInStreamExtended initialPositionInStreamExtended, Set<String> set, Map<String, Shard> map, Map<String, Lease> map2, Map<String, Boolean> map3) {
        Boolean bool = map3.get(str);
        if (bool != null) {
            return bool.booleanValue();
        }
        boolean z = false;
        HashSet hashSet = new HashSet();
        if (str != null && map.containsKey(str)) {
            if (set.contains(str)) {
                z = true;
            } else {
                Set<String> parentShardIds = getParentShardIds(map.get(str), map);
                for (String str2 : parentShardIds) {
                    if (checkIfDescendantAndAddNewLeasesForAncestors(str2, initialPositionInStreamExtended, set, map, map2, map3)) {
                        z = true;
                        hashSet.add(str2);
                        log.debug("Parent shard {} is a descendant.", str2);
                    } else {
                        log.debug("Parent shard {} is NOT a descendant.", str2);
                    }
                }
                if (z) {
                    for (String str3 : parentShardIds) {
                        if (!set.contains(str3)) {
                            log.debug("Need to create a lease for shardId {}", str3);
                            Lease lease = map2.get(str3);
                            if (lease == null) {
                                lease = newKCLLease(map.get(str3));
                                map2.put(str3, lease);
                            }
                            if (!hashSet.contains(str3) || initialPositionInStreamExtended.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
                                lease.checkpoint(convertToCheckpoint(initialPositionInStreamExtended));
                            } else {
                                lease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);
                            }
                        }
                    }
                } else if (initialPositionInStreamExtended.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON) || initialPositionInStreamExtended.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
                    z = true;
                }
            }
        }
        map3.put(str, Boolean.valueOf(z));
        return z;
    }

    static Set<String> getParentShardIds(Shard shard, Map<String, Shard> map) {
        HashSet hashSet = new HashSet(2);
        String parentShardId = shard.parentShardId();
        if (parentShardId != null && map.containsKey(parentShardId)) {
            hashSet.add(parentShardId);
        }
        String adjacentParentShardId = shard.adjacentParentShardId();
        if (adjacentParentShardId != null && map.containsKey(adjacentParentShardId)) {
            hashSet.add(adjacentParentShardId);
        }
        return hashSet;
    }

    private static void cleanupGarbageLeases(@NonNull ShardDetector shardDetector, List<Shard> list, List<Lease> list2, LeaseRefresher leaseRefresher) throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException {
        if (shardDetector == null) {
            throw new NullPointerException("shardDetector");
        }
        Set set = (Set) list.stream().map((v0) -> {
            return v0.shardId();
        }).collect(Collectors.toSet());
        List<Lease> list3 = (List) list2.stream().filter(lease -> {
            return isCandidateForCleanup(lease, set);
        }).collect(Collectors.toList());
        if (CollectionUtils.isNullOrEmpty(list3)) {
            return;
        }
        log.info("Found {} candidate leases for cleanup. Refreshing list of Kinesis shards to pick up recent/latest shards", Integer.valueOf(list3.size()));
        Set set2 = (Set) getShardList(shardDetector).stream().map((v0) -> {
            return v0.shardId();
        }).collect(Collectors.toSet());
        for (Lease lease2 : list3) {
            if (isCandidateForCleanup(lease2, set2)) {
                log.info("Deleting lease for shard {} as it is not present in Kinesis stream.", lease2.leaseKey());
                leaseRefresher.deleteLease(lease2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean isCandidateForCleanup(Lease lease, Set<String> set) throws KinesisClientLibIOException {
        boolean z = true;
        if (set.contains(lease.leaseKey())) {
            z = false;
        } else {
            log.info("Found lease for non-existent shard: {}. Checking its parent shards", lease.leaseKey());
            for (String str : lease.parentShardIds()) {
                if (set.contains(str)) {
                    String format = String.format("Parent shard %s exists but not the child shard %s", str, lease.leaseKey());
                    log.info(format);
                    throw new KinesisClientLibIOException(format);
                }
            }
        }
        return z;
    }

    private synchronized void cleanupLeasesOfFinishedShards(Collection<Lease> collection, Map<String, Shard> map, Map<String, Set<String>> map2, List<Lease> list, LeaseRefresher leaseRefresher) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
        List list2 = (List) collection.stream().filter(lease -> {
            return lease.checkpoint().equals(ExtendedSequenceNumber.SHARD_END);
        }).collect(Collectors.toList());
        Set<String> set = (Set) list2.stream().map((v0) -> {
            return v0.leaseKey();
        }).collect(Collectors.toSet());
        if (CollectionUtils.isNullOrEmpty(list2)) {
            return;
        }
        assertClosedShardsAreCoveredOrAbsent(map, map2, set);
        list2.sort(new StartingSequenceNumberAndShardIdBasedComparator(map));
        Map<String, Lease> map3 = (Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.leaseKey();
        }, Function.identity()));
        Iterator it = list2.iterator();
        while (it.hasNext()) {
            String leaseKey = ((Lease) it.next()).leaseKey();
            Set<String> set2 = map2.get(leaseKey);
            if (leaseKey != null && !CollectionUtils.isNullOrEmpty(set2)) {
                cleanupLeaseForClosedShard(leaseKey, set2, map3, leaseRefresher);
            }
        }
    }

    synchronized void cleanupLeaseForClosedShard(String str, Set<String> set, Map<String, Lease> map, LeaseRefresher leaseRefresher) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
        Lease lease = map.get(str);
        Stream<String> stream = set.stream();
        map.getClass();
        List list = (List) stream.map((v1) -> {
            return r1.get(v1);
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toList());
        if (lease != null && lease.checkpoint().equals(ExtendedSequenceNumber.SHARD_END) && list.size() == set.size()) {
            boolean z = true;
            Iterator it = list.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                } else if (((Lease) it.next()).checkpoint().equals(ExtendedSequenceNumber.TRIM_HORIZON)) {
                    z = false;
                    break;
                }
            }
            if (z) {
                log.info("Deleting lease for shard {} as it has been completely processed and processing of child shards has begun.", lease.leaseKey());
                leaseRefresher.deleteLease(lease);
            }
        }
    }

    private static Lease newKCLLease(Shard shard) {
        Lease lease = new Lease();
        lease.leaseKey(shard.shardId());
        ArrayList arrayList = new ArrayList(2);
        if (shard.parentShardId() != null) {
            arrayList.add(shard.parentShardId());
        }
        if (shard.adjacentParentShardId() != null) {
            arrayList.add(shard.adjacentParentShardId());
        }
        lease.parentShardIds(arrayList);
        lease.ownerSwitchesSinceCheckpoint(0L);
        return lease;
    }

    static Map<String, Shard> constructShardIdToShardMap(List<Shard> list) {
        return (Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.shardId();
        }, Function.identity()));
    }

    static List<Shard> getOpenShards(List<Shard> list) {
        return (List) list.stream().filter(shard -> {
            return shard.sequenceNumberRange().endingSequenceNumber() == null;
        }).peek(shard2 -> {
            log.debug("Found open shard: {}", shard2.shardId());
        }).collect(Collectors.toList());
    }

    private static ExtendedSequenceNumber convertToCheckpoint(InitialPositionInStreamExtended initialPositionInStreamExtended) {
        ExtendedSequenceNumber extendedSequenceNumber = null;
        if (initialPositionInStreamExtended.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)) {
            extendedSequenceNumber = ExtendedSequenceNumber.TRIM_HORIZON;
        } else if (initialPositionInStreamExtended.getInitialPositionInStream().equals(InitialPositionInStream.LATEST)) {
            extendedSequenceNumber = ExtendedSequenceNumber.LATEST;
        } else if (initialPositionInStreamExtended.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
            extendedSequenceNumber = ExtendedSequenceNumber.AT_TIMESTAMP;
        }
        return extendedSequenceNumber;
    }
}
