/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.kafka.spout;

import java.io.Serializable;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.Validate;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.KafkaSpoutMessageId;
import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
import org.apache.storm.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSpoutRetryExponentialBackoff
implements KafkaSpoutRetryService {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutRetryExponentialBackoff.class);
    private static final RetryEntryTimeStampComparator RETRY_ENTRY_TIME_STAMP_COMPARATOR = new RetryEntryTimeStampComparator();
    private final TimeInterval initialDelay;
    private final TimeInterval delayPeriod;
    private final TimeInterval maxDelay;
    private final int maxRetries;
    private final Set<RetrySchedule> retrySchedules = new TreeSet<RetrySchedule>(RETRY_ENTRY_TIME_STAMP_COMPARATOR);
    private final Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<KafkaSpoutMessageId>();

    public KafkaSpoutRetryExponentialBackoff(TimeInterval initialDelay, TimeInterval delayPeriod, int maxRetries, TimeInterval maxDelay) {
        this.initialDelay = initialDelay;
        this.delayPeriod = delayPeriod;
        this.maxRetries = maxRetries;
        this.maxDelay = maxDelay;
        LOG.debug("Instantiated {}", (Object)this.toStringImpl());
    }

    @Override
    public Map<TopicPartition, Long> earliestRetriableOffsets() {
        HashMap<TopicPartition, Long> tpToEarliestRetriableOffset = new HashMap<TopicPartition, Long>();
        long currentTimeNanos = Time.nanoTime();
        for (RetrySchedule retrySchedule : this.retrySchedules) {
            if (!retrySchedule.retry(currentTimeNanos)) break;
            KafkaSpoutMessageId msgId = retrySchedule.msgId;
            TopicPartition tpForMessage = new TopicPartition(msgId.topic(), msgId.partition());
            tpToEarliestRetriableOffset.merge(tpForMessage, msgId.offset(), Math::min);
        }
        LOG.debug("Topic partitions with entries ready to be retried [{}] ", tpToEarliestRetriableOffset);
        return tpToEarliestRetriableOffset;
    }

    @Override
    public boolean isReady(KafkaSpoutMessageId msgId) {
        boolean retry = false;
        if (this.isScheduled(msgId)) {
            long currentTimeNanos = Time.nanoTime();
            for (RetrySchedule retrySchedule : this.retrySchedules) {
                if (retrySchedule.retry(currentTimeNanos)) {
                    if (!retrySchedule.msgId.equals(msgId)) continue;
                    retry = true;
                    LOG.debug("Found entry to retry {}", (Object)retrySchedule);
                    break;
                }
                LOG.debug("Entry to retry not found {}", (Object)retrySchedule);
                break;
            }
        }
        return retry;
    }

    @Override
    public boolean isScheduled(KafkaSpoutMessageId msgId) {
        return this.toRetryMsgs.contains(msgId);
    }

    @Override
    public boolean remove(KafkaSpoutMessageId msgId) {
        boolean removed = false;
        if (this.isScheduled(msgId)) {
            this.toRetryMsgs.remove(msgId);
            Iterator<RetrySchedule> iterator = this.retrySchedules.iterator();
            while (iterator.hasNext()) {
                RetrySchedule retrySchedule = iterator.next();
                if (!retrySchedule.msgId().equals(msgId)) continue;
                iterator.remove();
                removed = true;
                break;
            }
        }
        LOG.debug(removed ? "Removed {} " : "Not removed {}", (Object)msgId);
        LOG.trace("Current state {}", this.retrySchedules);
        return removed;
    }

    @Override
    public boolean retainAll(Collection<TopicPartition> topicPartitions) {
        boolean result = false;
        Iterator<RetrySchedule> rsIterator = this.retrySchedules.iterator();
        while (rsIterator.hasNext()) {
            RetrySchedule retrySchedule = rsIterator.next();
            KafkaSpoutMessageId msgId = retrySchedule.msgId;
            TopicPartition tpRetry = new TopicPartition(msgId.topic(), msgId.partition());
            if (topicPartitions.contains(tpRetry)) continue;
            rsIterator.remove();
            this.toRetryMsgs.remove(msgId);
            LOG.debug("Removed {}", (Object)retrySchedule);
            LOG.trace("Current state {}", this.retrySchedules);
            result = true;
        }
        return result;
    }

    @Override
    public boolean schedule(KafkaSpoutMessageId msgId) {
        if (msgId.numFails() > this.maxRetries) {
            LOG.debug("Not scheduling [{}] because reached maximum number of retries [{}].", (Object)msgId, (Object)this.maxRetries);
            return false;
        }
        this.remove(msgId);
        RetrySchedule retrySchedule = new RetrySchedule(msgId, this.nextTime(msgId));
        this.retrySchedules.add(retrySchedule);
        this.toRetryMsgs.add(msgId);
        LOG.debug("Scheduled. {}", (Object)retrySchedule);
        LOG.trace("Current state {}", this.retrySchedules);
        return true;
    }

    @Override
    public int readyMessageCount() {
        int count = 0;
        long currentTimeNanos = Time.nanoTime();
        for (RetrySchedule retrySchedule : this.retrySchedules) {
            if (!retrySchedule.retry(currentTimeNanos)) break;
            ++count;
        }
        return count;
    }

    @Override
    public KafkaSpoutMessageId getMessageId(TopicPartition tp, long offset) {
        KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(tp, offset);
        if (this.isScheduled(msgId)) {
            for (KafkaSpoutMessageId originalMsgId : this.toRetryMsgs) {
                if (!originalMsgId.equals(msgId)) continue;
                return originalMsgId;
            }
        }
        return msgId;
    }

    private long nextTime(KafkaSpoutMessageId msgId) {
        Validate.isTrue((msgId.numFails() > 0 ? 1 : 0) != 0, (String)"nextTime assumes the message has failed at least once");
        long currentTimeNanos = Time.nanoTime();
        long nextTimeNanos = msgId.numFails() == 1 ? currentTimeNanos + this.initialDelay.lengthNanos : currentTimeNanos + this.delayPeriod.lengthNanos * (long)Math.pow(2.0, msgId.numFails() - 1);
        return Math.min(nextTimeNanos, currentTimeNanos + this.maxDelay.lengthNanos);
    }

    public String toString() {
        return this.toStringImpl();
    }

    private String toStringImpl() {
        return "KafkaSpoutRetryExponentialBackoff{delay=" + this.initialDelay + ", ratio=" + this.delayPeriod + ", maxRetries=" + this.maxRetries + ", maxRetryDelay=" + this.maxDelay + "}";
    }

    public static class TimeInterval
    implements Serializable {
        private final long lengthNanos;
        private final TimeUnit timeUnit;
        private final long length;

        public TimeInterval(long length, TimeUnit timeUnit) {
            this.lengthNanos = timeUnit.toNanos(length);
            this.timeUnit = timeUnit;
            this.length = length;
        }

        public static TimeInterval seconds(long length) {
            return new TimeInterval(length, TimeUnit.SECONDS);
        }

        public static TimeInterval milliSeconds(long length) {
            return new TimeInterval(length, TimeUnit.MILLISECONDS);
        }

        public static TimeInterval microSeconds(long length) {
            return new TimeInterval(length, TimeUnit.MICROSECONDS);
        }

        public long lengthNanos() {
            return this.lengthNanos;
        }

        public TimeUnit timeUnit() {
            return this.timeUnit;
        }

        public String toString() {
            return "TimeInterval{length=" + this.length + ", timeUnit=" + this.timeUnit + "}";
        }
    }

    private class RetrySchedule {
        private final KafkaSpoutMessageId msgId;
        private long nextRetryTimeNanos;

        RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTimeNanos) {
            this.msgId = msgId;
            this.nextRetryTimeNanos = nextRetryTimeNanos;
            LOG.debug("Created {}", (Object)this);
        }

        public void setNextRetryTimeNanos() {
            this.nextRetryTimeNanos = KafkaSpoutRetryExponentialBackoff.this.nextTime(this.msgId);
            LOG.debug("Updated {}", (Object)this);
        }

        public boolean retry(long currentTimeNanos) {
            return this.nextRetryTimeNanos <= currentTimeNanos;
        }

        public String toString() {
            return "RetrySchedule{msgId=" + this.msgId + ", nextRetryTimeNanos=" + this.nextRetryTimeNanos + "}";
        }

        public KafkaSpoutMessageId msgId() {
            return this.msgId;
        }

        public long nextRetryTimeNanos() {
            return this.nextRetryTimeNanos;
        }
    }

    private static class RetryEntryTimeStampComparator
    implements Serializable,
    Comparator<RetrySchedule> {
        private RetryEntryTimeStampComparator() {
        }

        @Override
        public int compare(RetrySchedule entry1, RetrySchedule entry2) {
            int result = Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
            if (result == 0) {
                result = entry1.hashCode() - entry2.hashCode();
            }
            return result;
        }
    }
}

