package org.apache.storm.kafka.spout;

import java.io.Serializable;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.class */
public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutRetryExponentialBackoff.class);
    private static final RetryEntryTimeStampComparator RETRY_ENTRY_TIME_STAMP_COMPARATOR = new RetryEntryTimeStampComparator();
    private final TimeInterval initialDelay;
    private final TimeInterval delayPeriod;
    private final TimeInterval maxDelay;
    private final int maxRetries;
    private final Set<RetrySchedule> retrySchedules = new TreeSet(RETRY_ENTRY_TIME_STAMP_COMPARATOR);
    private final Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet();

    /* loaded from: input_file:org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff$RetryEntryTimeStampComparator.class */
    private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
        private RetryEntryTimeStampComparator() {
        }

        @Override // java.util.Comparator
        public int compare(RetrySchedule retrySchedule, RetrySchedule retrySchedule2) {
            return Long.valueOf(retrySchedule.nextRetryTimeNanos()).compareTo(Long.valueOf(retrySchedule2.nextRetryTimeNanos()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff$RetrySchedule.class */
    public class RetrySchedule {
        private final KafkaSpoutMessageId msgId;
        private long nextRetryTimeNanos;

        public RetrySchedule(KafkaSpoutMessageId kafkaSpoutMessageId, long j) {
            this.msgId = kafkaSpoutMessageId;
            this.nextRetryTimeNanos = j;
            KafkaSpoutRetryExponentialBackoff.LOG.debug("Created {}", this);
        }

        public void setNextRetryTime() {
            this.nextRetryTimeNanos = KafkaSpoutRetryExponentialBackoff.this.nextTime(this.msgId);
            KafkaSpoutRetryExponentialBackoff.LOG.debug("Updated {}", this);
        }

        public boolean retry(long j) {
            return this.nextRetryTimeNanos <= j;
        }

        public String toString() {
            return "RetrySchedule{msgId=" + this.msgId + ", nextRetryTime=" + this.nextRetryTimeNanos + '}';
        }

        public KafkaSpoutMessageId msgId() {
            return this.msgId;
        }

        public long nextRetryTimeNanos() {
            return this.nextRetryTimeNanos;
        }
    }

    /* loaded from: input_file:org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff$TimeInterval.class */
    public static class TimeInterval implements Serializable {
        private final long lengthNanos;
        private final long length;
        private final TimeUnit timeUnit;

        public TimeInterval(long j, TimeUnit timeUnit) {
            this.length = j;
            this.timeUnit = timeUnit;
            this.lengthNanos = timeUnit.toNanos(j);
        }

        public static TimeInterval seconds(long j) {
            return new TimeInterval(j, TimeUnit.SECONDS);
        }

        public static TimeInterval milliSeconds(long j) {
            return new TimeInterval(j, TimeUnit.MILLISECONDS);
        }

        public static TimeInterval microSeconds(long j) {
            return new TimeInterval(j, TimeUnit.MICROSECONDS);
        }

        public long lengthNanos() {
            return this.lengthNanos;
        }

        public long length() {
            return this.length;
        }

        public TimeUnit timeUnit() {
            return this.timeUnit;
        }

        public String toString() {
            return "TimeInterval{length=" + this.length + ", timeUnit=" + this.timeUnit + '}';
        }
    }

    public KafkaSpoutRetryExponentialBackoff(TimeInterval timeInterval, TimeInterval timeInterval2, int i, TimeInterval timeInterval3) {
        this.initialDelay = timeInterval;
        this.delayPeriod = timeInterval2;
        this.maxRetries = i;
        this.maxDelay = timeInterval3;
        LOG.debug("Instantiated {}", this);
    }

    @Override // org.apache.storm.kafka.spout.KafkaSpoutRetryService
    public Set<TopicPartition> retriableTopicPartitions() {
        HashSet hashSet = new HashSet();
        long nanoTime = System.nanoTime();
        for (RetrySchedule retrySchedule : this.retrySchedules) {
            if (!retrySchedule.retry(nanoTime)) {
                break;
            }
            KafkaSpoutMessageId kafkaSpoutMessageId = retrySchedule.msgId;
            hashSet.add(new TopicPartition(kafkaSpoutMessageId.topic(), kafkaSpoutMessageId.partition()));
        }
        LOG.debug("Topic partitions with entries ready to be retried [{}] ", hashSet);
        return hashSet;
    }

    @Override // org.apache.storm.kafka.spout.KafkaSpoutRetryService
    public boolean isReady(KafkaSpoutMessageId kafkaSpoutMessageId) {
        boolean z = false;
        if (this.toRetryMsgs.contains(kafkaSpoutMessageId)) {
            long nanoTime = System.nanoTime();
            Iterator<RetrySchedule> it = this.retrySchedules.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                RetrySchedule next = it.next();
                if (!next.retry(nanoTime)) {
                    LOG.debug("Entry to retry not found {}", next);
                    break;
                }
                if (next.msgId.equals(kafkaSpoutMessageId)) {
                    z = true;
                    LOG.debug("Found entry to retry {}", next);
                }
            }
        }
        return z;
    }

    @Override // org.apache.storm.kafka.spout.KafkaSpoutRetryService
    public boolean isScheduled(KafkaSpoutMessageId kafkaSpoutMessageId) {
        return this.toRetryMsgs.contains(kafkaSpoutMessageId);
    }

    @Override // org.apache.storm.kafka.spout.KafkaSpoutRetryService
    public boolean remove(KafkaSpoutMessageId kafkaSpoutMessageId) {
        boolean z = false;
        if (this.toRetryMsgs.contains(kafkaSpoutMessageId)) {
            Iterator<RetrySchedule> it = this.retrySchedules.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                if (it.next().msgId().equals(kafkaSpoutMessageId)) {
                    it.remove();
                    this.toRetryMsgs.remove(kafkaSpoutMessageId);
                    z = true;
                    break;
                }
            }
        }
        LOG.debug(z ? "Removed {} " : "Not removed {}", kafkaSpoutMessageId);
        LOG.trace("Current state {}", this.retrySchedules);
        return z;
    }

    @Override // org.apache.storm.kafka.spout.KafkaSpoutRetryService
    public boolean retainAll(Collection<TopicPartition> collection) {
        boolean z = false;
        Iterator<RetrySchedule> it = this.retrySchedules.iterator();
        while (it.hasNext()) {
            RetrySchedule next = it.next();
            KafkaSpoutMessageId kafkaSpoutMessageId = next.msgId;
            if (!collection.contains(new TopicPartition(kafkaSpoutMessageId.topic(), kafkaSpoutMessageId.partition()))) {
                it.remove();
                this.toRetryMsgs.remove(kafkaSpoutMessageId);
                LOG.debug("Removed {}", next);
                LOG.trace("Current state {}", this.retrySchedules);
                z = true;
            }
        }
        return z;
    }

    @Override // org.apache.storm.kafka.spout.KafkaSpoutRetryService
    public boolean schedule(KafkaSpoutMessageId kafkaSpoutMessageId) {
        if (kafkaSpoutMessageId.numFails() > this.maxRetries) {
            LOG.debug("Not scheduling [{}] because reached maximum number of retries [{}].", kafkaSpoutMessageId, Integer.valueOf(this.maxRetries));
            return false;
        }
        if (this.toRetryMsgs.contains(kafkaSpoutMessageId)) {
            Iterator<RetrySchedule> it = this.retrySchedules.iterator();
            while (it.hasNext()) {
                if (it.next().msgId().equals(kafkaSpoutMessageId)) {
                    it.remove();
                    this.toRetryMsgs.remove(kafkaSpoutMessageId);
                }
            }
        }
        RetrySchedule retrySchedule = new RetrySchedule(kafkaSpoutMessageId, nextTime(kafkaSpoutMessageId));
        this.retrySchedules.add(retrySchedule);
        this.toRetryMsgs.add(kafkaSpoutMessageId);
        LOG.debug("Scheduled. {}", retrySchedule);
        LOG.trace("Current state {}", this.retrySchedules);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long nextTime(KafkaSpoutMessageId kafkaSpoutMessageId) {
        long nanoTime = System.nanoTime();
        return Math.min(kafkaSpoutMessageId.numFails() == 1 ? nanoTime + this.initialDelay.lengthNanos : nanoTime + this.delayPeriod.timeUnit.toNanos((long) Math.pow(this.delayPeriod.length, kafkaSpoutMessageId.numFails() - 1)), nanoTime + this.maxDelay.lengthNanos);
    }

    public String toString() {
        return "KafkaSpoutRetryExponentialBackoff{delay=" + this.initialDelay + ", ratio=" + this.delayPeriod + ", maxRetries=" + this.maxRetries + ", maxRetryDelay=" + this.maxDelay + '}';
    }
}
