/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.kafka.spout;

import java.io.Serializable;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.KafkaSpoutMessageId;
import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSpoutRetryExponentialBackoff
implements KafkaSpoutRetryService {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutRetryExponentialBackoff.class);
    private static final RetryEntryTimeStampComparator RETRY_ENTRY_TIME_STAMP_COMPARATOR = new RetryEntryTimeStampComparator();
    private TimeInterval initialDelay;
    private TimeInterval delayPeriod;
    private TimeInterval maxDelay;
    private int maxRetries;
    private Set<RetrySchedule> retrySchedules = new TreeSet<RetrySchedule>(RETRY_ENTRY_TIME_STAMP_COMPARATOR);
    private Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<KafkaSpoutMessageId>();

    public KafkaSpoutRetryExponentialBackoff(TimeInterval initialDelay, TimeInterval delayPeriod, int maxRetries, TimeInterval maxDelay) {
        this.initialDelay = initialDelay;
        this.delayPeriod = delayPeriod;
        this.maxRetries = maxRetries;
        this.maxDelay = maxDelay;
        LOG.debug("Instantiated {}", (Object)this);
    }

    @Override
    public Set<TopicPartition> retriableTopicPartitions() {
        TreeSet<TopicPartition> tps = new TreeSet<TopicPartition>();
        long currentTimeNanos = System.nanoTime();
        for (RetrySchedule retrySchedule : this.retrySchedules) {
            if (!retrySchedule.retry(currentTimeNanos)) break;
            KafkaSpoutMessageId msgId = retrySchedule.msgId;
            tps.add(new TopicPartition(msgId.topic(), msgId.partition()));
        }
        LOG.debug("Topic partitions with entries ready to be retried [{}] ", tps);
        return tps;
    }

    @Override
    public boolean isReady(KafkaSpoutMessageId msgId) {
        boolean retry = false;
        if (this.toRetryMsgs.contains(msgId)) {
            long currentTimeNanos = System.nanoTime();
            for (RetrySchedule retrySchedule : this.retrySchedules) {
                if (retrySchedule.retry(currentTimeNanos)) {
                    if (!retrySchedule.msgId.equals(msgId)) continue;
                    retry = true;
                    LOG.debug("Found entry to retry {}", (Object)retrySchedule);
                    continue;
                }
                LOG.debug("Entry to retry not found {}", (Object)retrySchedule);
                break;
            }
        }
        return retry;
    }

    @Override
    public boolean isScheduled(KafkaSpoutMessageId msgId) {
        return this.toRetryMsgs.contains(msgId);
    }

    @Override
    public boolean remove(KafkaSpoutMessageId msgId) {
        boolean removed = false;
        if (this.toRetryMsgs.contains(msgId)) {
            Iterator<RetrySchedule> iterator = this.retrySchedules.iterator();
            while (iterator.hasNext()) {
                RetrySchedule retrySchedule = iterator.next();
                if (!retrySchedule.msgId().equals(msgId)) continue;
                iterator.remove();
                this.toRetryMsgs.remove(msgId);
                removed = true;
                break;
            }
        }
        LOG.debug(removed ? "Removed {} " : "Not removed {}", (Object)msgId);
        LOG.trace("Current state {}", this.retrySchedules);
        return removed;
    }

    @Override
    public boolean retainAll(Collection<TopicPartition> topicPartitions) {
        boolean result = false;
        Iterator<RetrySchedule> rsIterator = this.retrySchedules.iterator();
        while (rsIterator.hasNext()) {
            RetrySchedule retrySchedule = rsIterator.next();
            KafkaSpoutMessageId msgId = retrySchedule.msgId;
            TopicPartition tpRetry = new TopicPartition(msgId.topic(), msgId.partition());
            if (topicPartitions.contains(tpRetry)) continue;
            rsIterator.remove();
            this.toRetryMsgs.remove(msgId);
            LOG.debug("Removed {}", (Object)retrySchedule);
            LOG.trace("Current state {}", this.retrySchedules);
            result = true;
        }
        return result;
    }

    @Override
    public void schedule(KafkaSpoutMessageId msgId) {
        if (msgId.numFails() > this.maxRetries) {
            LOG.debug("Not scheduling [{}] because reached maximum number of retries [{}].", (Object)msgId, (Object)this.maxRetries);
        } else {
            if (this.toRetryMsgs.contains(msgId)) {
                Iterator<RetrySchedule> iterator = this.retrySchedules.iterator();
                while (iterator.hasNext()) {
                    RetrySchedule retrySchedule = iterator.next();
                    if (!retrySchedule.msgId().equals(msgId)) continue;
                    iterator.remove();
                    this.toRetryMsgs.remove(msgId);
                }
            }
            RetrySchedule retrySchedule = new RetrySchedule(msgId, this.nextTime(msgId));
            this.retrySchedules.add(retrySchedule);
            this.toRetryMsgs.add(msgId);
            LOG.debug("Scheduled. {}", (Object)retrySchedule);
            LOG.trace("Current state {}", this.retrySchedules);
        }
    }

    private long nextTime(KafkaSpoutMessageId msgId) {
        long currentTimeNanos = System.nanoTime();
        long nextTimeNanos = msgId.numFails() == 1 ? currentTimeNanos + this.initialDelay.lengthNanos() : (long)((double)currentTimeNanos + Math.pow(this.delayPeriod.lengthNanos, msgId.numFails() - 1));
        return Math.min(nextTimeNanos, currentTimeNanos + this.maxDelay.lengthNanos);
    }

    public String toString() {
        return "KafkaSpoutRetryExponentialBackoff{delay=" + this.initialDelay + ", ratio=" + this.delayPeriod + ", maxRetries=" + this.maxRetries + ", maxRetryDelay=" + this.maxDelay + '}';
    }

    public static class TimeInterval
    implements Serializable {
        private long lengthNanos;
        private long length;
        private TimeUnit timeUnit;

        public TimeInterval(long length, TimeUnit timeUnit) {
            this.length = length;
            this.timeUnit = timeUnit;
            this.lengthNanos = timeUnit.toNanos(length);
        }

        public static TimeInterval seconds(long length) {
            return new TimeInterval(length, TimeUnit.SECONDS);
        }

        public static TimeInterval milliSeconds(long length) {
            return new TimeInterval(length, TimeUnit.MILLISECONDS);
        }

        public static TimeInterval microSeconds(long length) {
            return new TimeInterval(length, TimeUnit.MILLISECONDS);
        }

        public long lengthNanos() {
            return this.lengthNanos;
        }

        public long length() {
            return this.length;
        }

        public TimeUnit timeUnit() {
            return this.timeUnit;
        }

        public String toString() {
            return "TimeInterval{length=" + this.length + ", timeUnit=" + (Object)((Object)this.timeUnit) + '}';
        }
    }

    private class RetrySchedule {
        private KafkaSpoutMessageId msgId;
        private long nextRetryTimeNanos;

        public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTime) {
            this.msgId = msgId;
            this.nextRetryTimeNanos = nextRetryTime;
            LOG.debug("Created {}", (Object)this);
        }

        public void setNextRetryTime() {
            this.nextRetryTimeNanos = KafkaSpoutRetryExponentialBackoff.this.nextTime(this.msgId);
            LOG.debug("Updated {}", (Object)this);
        }

        public boolean retry(long currentTimeNanos) {
            return this.nextRetryTimeNanos <= currentTimeNanos;
        }

        public String toString() {
            return "RetrySchedule{msgId=" + this.msgId + ", nextRetryTime=" + this.nextRetryTimeNanos + '}';
        }

        public KafkaSpoutMessageId msgId() {
            return this.msgId;
        }

        public long nextRetryTimeNanos() {
            return this.nextRetryTimeNanos;
        }
    }

    private static class RetryEntryTimeStampComparator
    implements Serializable,
    Comparator<RetrySchedule> {
        private RetryEntryTimeStampComparator() {
        }

        @Override
        public int compare(RetrySchedule entry1, RetrySchedule entry2) {
            return Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
        }
    }
}

