/*
 * Decompiled with CFR 0.152.
 */
package orestes.bloomfilter.cachesketch;

import java.util.Random;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import orestes.bloomfilter.FilterBuilder;
import orestes.bloomfilter.TimeMap;
import orestes.bloomfilter.cachesketch.AbstractExpiringBloomFilterRedis;
import orestes.bloomfilter.redis.MessagePackEncoder;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Tuple;
import selogger.org.slf4j.Logger;
import selogger.org.slf4j.LoggerFactory;

public class ExpiringBloomFilterPureRedis
extends AbstractExpiringBloomFilterRedis<String> {
    private static final long MAX_JOB_DELAY = TimeUnit.MILLISECONDS.convert(60L, TimeUnit.SECONDS);
    private static final long MIN_JOB_DELAY = 100L;
    private static final Logger LOG = LoggerFactory.getLogger(ExpiringBloomFilterPureRedis.class);
    private final String expireQueueScript = this.loadLuaScript("expireQueue.lua");
    private final Random random = new Random();
    private final MessagePackEncoder msgPack = new MessagePackEncoder();
    private ScheduledFuture<?> job;
    private boolean isEnabled;

    public ExpiringBloomFilterPureRedis(FilterBuilder builder) {
        super(builder);
        this.enableJob();
    }

    @Override
    public void clear() {
        try (Jedis jedis = this.pool.getResource();){
            jedis.del(new String[]{this.keys.EXPIRATION_QUEUE_KEY, this.keys.COUNTS_KEY, this.keys.BITS_KEY, this.keys.TTL_KEY});
        }
    }

    @Override
    public void softClear() {
        try (Jedis jedis = this.pool.getResource();){
            jedis.del(new String[]{this.keys.COUNTS_KEY, this.keys.BITS_KEY, this.keys.EXPIRATION_QUEUE_KEY});
        }
    }

    @Override
    public synchronized void remove() {
        super.remove();
        if (this.job != null) {
            this.job.cancel(true);
            this.job = null;
        }
    }

    @Override
    public void addToQueue(String item, long remaining, TimeUnit timeUnit) {
        try (Jedis jedis = this.pool.getResource();){
            byte[] member;
            boolean done;
            do {
                int[] positions = this.hash(item.getBytes());
                member = this.msgPack.encodeItem(item, positions);
            } while (!(done = jedis.zadd(this.keys.EXPIRATION_QUEUE_KEY.getBytes(), (double)(this.now() + TimeUnit.MILLISECONDS.convert(remaining, timeUnit)), member) == 1L));
        }
        this.triggerExpirationHandling(remaining, timeUnit);
    }

    public synchronized boolean onExpire() {
        long now = this.now();
        LOG.debug("[{}] Expiring items ... {}", (Object)this.config.name(), (Object)now);
        try (Jedis jedis = this.pool.getResource();){
            long expiredItems = (Long)jedis.evalsha(this.expireQueueScript, 3, new String[]{this.keys.EXPIRATION_QUEUE_KEY, this.keys.COUNTS_KEY, this.keys.BITS_KEY, String.valueOf(now)});
            LOG.debug("[{}] Script expired {} items within {}ms", this.config.name(), expiredItems, this.now() - now);
            boolean bl = true;
            return bl;
        }
    }

    @Override
    public TimeMap<String> getExpirationMap() {
        try (Jedis jedis = this.pool.getResource();){
            TimeMap<String> timeMap = jedis.zrangeWithScores(this.keys.EXPIRATION_QUEUE_KEY.getBytes(), 0L, -1L).stream().collect(TimeMap.collectMillis(tuple -> this.msgPack.decodeItem(tuple.getBinaryElement()), tuple -> (long)tuple.getScore()));
            return timeMap;
        }
    }

    @Override
    public void setExpirationMap(TimeMap<String> map) {
        try (Jedis jedis = this.pool.getResource();){
            Pipeline pipeline = jedis.pipelined();
            AtomicInteger ctr = new AtomicInteger(0);
            map.forEach((item, expiration) -> {
                int[] positions = this.hash((String)item);
                pipeline.zadd(this.keys.EXPIRATION_QUEUE_KEY.getBytes(), (double)expiration.longValue(), this.msgPack.encodeItem((String)item, positions));
                if (ctr.incrementAndGet() >= 1000) {
                    ctr.set(0);
                    pipeline.sync();
                }
            });
            pipeline.sync();
        }
    }

    @Override
    public boolean setExpirationEnabled(boolean enabled) {
        return enabled ? this.enableJob() : this.disableJob();
    }

    private synchronized void triggerExpirationHandling(long delay, TimeUnit unit) {
        if (!this.isEnabled) {
            return;
        }
        long delayInMilliseconds = TimeUnit.MILLISECONDS.convert(delay, unit);
        long currentDelay = this.job.getDelay(TimeUnit.MILLISECONDS);
        if (currentDelay > delayInMilliseconds + 100L) {
            this.scheduleJob(false, delay, unit);
        }
    }

    private synchronized void scheduleJob(boolean shouldNotCancel, long delay, TimeUnit unit) {
        ScheduledFuture<?> currentJob = this.job;
        if (!shouldNotCancel && currentJob != null) {
            LOG.debug("[{}] Cancel active job", (Object)this.config.name());
            currentJob.cancel(false);
        }
        LOG.debug("[" + this.config.name() + "] Scheduled the next expiration job in " + TimeUnit.MILLISECONDS.convert(delay, unit) + "ms");
        this.job = this.scheduler.schedule(this::expirationJob, delay, unit);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void expirationJob() {
        long nextDelay = 100L;
        try {
            boolean success = this.onExpire();
            nextDelay = success ? this.estimateNextDelay() : 100L;
        }
        catch (Exception e) {
            LOG.error("[" + this.config.name() + "] Error in script", e);
        }
        finally {
            this.scheduleJob(true, nextDelay, TimeUnit.MILLISECONDS);
        }
    }

    private long estimateNextDelay() {
        try (Jedis jedis = this.pool.getResource();){
            long now = this.now();
            long max = now + MAX_JOB_DELAY;
            Set nextQueueItems = jedis.zrangeByScoreWithScores(this.keys.EXPIRATION_QUEUE_KEY, 0.0, (double)max, 0, 5);
            if (nextQueueItems.isEmpty()) {
                LOG.debug("[{}] Queue empty, next try in {}ms", (Object)this.config.name(), (Object)MAX_JOB_DELAY);
                long l = MAX_JOB_DELAY;
                return l;
            }
            long next = nextQueueItems.stream().skip(this.random.nextInt(nextQueueItems.size())).findFirst().map(Tuple::getScore).map(it -> Math.min(MAX_JOB_DELAY, Math.max(100L, it.longValue() - now))).orElse(MAX_JOB_DELAY);
            LOG.debug("[{}] Estimated a next delay of {}ms", (Object)this.config.name(), (Object)next);
            long l = next;
            return l;
        }
    }

    private synchronized boolean enableJob() {
        if (this.isEnabled) {
            return false;
        }
        LOG.debug("Enabling expiration queue");
        this.isEnabled = true;
        this.scheduleJob(true, this.estimateNextDelay(), TimeUnit.MILLISECONDS);
        return true;
    }

    private synchronized boolean disableJob() {
        if (!this.isEnabled) {
            return false;
        }
        LOG.debug("Disabling expiration queue");
        this.isEnabled = false;
        this.job.cancel(false);
        this.job = null;
        return true;
    }
}

