/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.lookup;

import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Service;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.graylog2.lookup.LookupTable;
import org.graylog2.plugin.lookup.LookupCache;
import org.graylog2.plugin.lookup.LookupCacheKey;
import org.graylog2.plugin.lookup.LookupCachePurge;
import org.graylog2.plugin.lookup.LookupDataAdapter;
import org.graylog2.utilities.ObjectUtils;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LookupDataAdapterRefreshService
extends AbstractIdleService {
    private static final Logger LOG = LoggerFactory.getLogger(LookupDataAdapterRefreshService.class);
    private final ScheduledExecutorService scheduler;
    private final ConcurrentMap<String, LookupTable> liveTables;
    private final ConcurrentMap<String, ScheduledFuture<?>> futures = new ConcurrentHashMap();

    public LookupDataAdapterRefreshService(ScheduledExecutorService scheduler, ConcurrentMap<String, LookupTable> liveTables) {
        this.scheduler = scheduler;
        this.liveTables = liveTables;
    }

    protected void startUp() throws Exception {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void shutDown() throws Exception {
        ConcurrentMap<String, ScheduledFuture<?>> concurrentMap = this.futures;
        synchronized (concurrentMap) {
            LOG.info("Stopping {} jobs", (Object)this.futures.size());
            for (ScheduledFuture future : this.futures.values()) {
                this.cancel(future);
            }
            this.futures.clear();
        }
    }

    public Listener newServiceListener(LookupDataAdapter adapter) {
        return new Listener(this, adapter);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void add(LookupDataAdapter dataAdapter) {
        if (this.state() == Service.State.STOPPING || this.state() == Service.State.TERMINATED) {
            LOG.debug("Service is in state <{}> - not adding new job for <{}/{}/@{}>", new Object[]{this.state(), dataAdapter.name(), dataAdapter.id(), ObjectUtils.objectId((Object)dataAdapter)});
            return;
        }
        Duration interval = dataAdapter.refreshInterval();
        if (!interval.equals((Object)Duration.ZERO)) {
            String instanceId = ObjectUtils.objectId((Object)dataAdapter);
            ConcurrentMap<String, ScheduledFuture<?>> concurrentMap = this.futures;
            synchronized (concurrentMap) {
                if (!this.futures.containsKey(instanceId)) {
                    LOG.info("Adding job for <{}/{}/@{}> [interval={}ms]", new Object[]{dataAdapter.name(), dataAdapter.id(), instanceId, interval.getMillis()});
                    this.futures.put(instanceId, this.schedule(dataAdapter, interval));
                } else {
                    LOG.warn("Job for <{}/{}/@{}> already exists, not adding it again.", new Object[]{dataAdapter.name(), dataAdapter.id(), instanceId});
                }
            }
        }
    }

    public void remove(LookupDataAdapter dataAdapter) {
        if (this.state() == Service.State.STOPPING || this.state() == Service.State.TERMINATED) {
            LOG.debug("Service is in state <{}> - not removing job for <{}/{}/@{}>", new Object[]{this.state(), dataAdapter.name(), dataAdapter.id(), ObjectUtils.objectId((Object)dataAdapter)});
            return;
        }
        String instanceId = ObjectUtils.objectId((Object)dataAdapter);
        if (this.futures.containsKey(instanceId)) {
            LOG.info("Removing job for <{}/{}/@{}>", new Object[]{dataAdapter.name(), dataAdapter.id(), instanceId});
        }
        this.cancel((ScheduledFuture)this.futures.remove(instanceId));
    }

    private ScheduledFuture<?> schedule(LookupDataAdapter dataAdapter, Duration interval) {
        CachePurge cachePurge = new CachePurge(this.liveTables, dataAdapter);
        return this.scheduler.scheduleAtFixedRate(() -> {
            try {
                dataAdapter.refresh(cachePurge);
            }
            catch (Exception e) {
                LOG.warn("Unhandled error while refreshing <{}/{}/@{}>", new Object[]{dataAdapter.name(), dataAdapter.id(), ObjectUtils.objectId((Object)dataAdapter), e});
            }
        }, interval.getMillis(), interval.getMillis(), TimeUnit.MILLISECONDS);
    }

    private void cancel(@Nullable ScheduledFuture<?> future) {
        if (future != null && !future.isCancelled() && !future.cancel(true)) {
            LOG.warn("Could not cancel refresh job");
        }
    }

    private static class CachePurge
    implements LookupCachePurge {
        private final ConcurrentMap<String, LookupTable> tables;
        private final LookupDataAdapter adapter;

        CachePurge(ConcurrentMap<String, LookupTable> tables, LookupDataAdapter adapter) {
            this.tables = tables;
            this.adapter = adapter;
        }

        @Override
        public void purgeAll() {
            this.caches().forEach(cache -> cache.purge(LookupCacheKey.prefix(this.adapter.name())));
        }

        @Override
        public void purgeKey(Object key) {
            this.caches().forEach(cache -> cache.purge(LookupCacheKey.create(this.adapter.name(), key)));
        }

        private Stream<LookupCache> caches() {
            return this.tables.values().stream().filter(table -> table.dataAdapter().id().equals(this.adapter.id())).map(LookupTable::cache);
        }
    }

    public static class Listener
    extends Service.Listener {
        private final LookupDataAdapterRefreshService refreshService;
        private final LookupDataAdapter adapter;

        public Listener(LookupDataAdapterRefreshService refreshService, LookupDataAdapter adapter) {
            this.refreshService = refreshService;
            this.adapter = adapter;
        }

        public void running() {
            this.refreshService.add(this.adapter);
        }

        public void stopping(Service.State from) {
            this.refreshService.remove(this.adapter);
        }
    }
}

