package io.cdap.cdap.common.async;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/* loaded from: input_file:io/cdap/cdap/common/async/KeyedExecutor.class */
public class KeyedExecutor<K> {
    private final ScheduledExecutorService scheduler;
    private final ConcurrentMap<K, CompletableFuture<Void>> futures = new ConcurrentHashMap();

    public KeyedExecutor(ScheduledExecutorService scheduledExecutorService) {
        this.scheduler = scheduledExecutorService;
    }

    public Future<Void> execute(K k, Runnable runnable) {
        return submit(k, () -> {
            runnable.run();
            return -1L;
        });
    }

    public Future<Void> submit(K k, final RepeatedTask repeatedTask) {
        final AtomicReference atomicReference = new AtomicReference();
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        final CompletableFuture<Void> completableFuture = new CompletableFuture<Void>() { // from class: io.cdap.cdap.common.async.KeyedExecutor.1
            @Override // java.util.concurrent.CompletableFuture, java.util.concurrent.Future
            public boolean cancel(boolean z) {
                if (atomicBoolean.compareAndSet(false, true)) {
                    Optional.ofNullable(atomicReference.get()).ifPresent(future -> {
                        future.cancel(z);
                    });
                }
                return super.cancel(z);
            }
        };
        CompletableFuture<Void> putIfAbsent = this.futures.putIfAbsent(k, completableFuture);
        if (putIfAbsent != null) {
            return putIfAbsent;
        }
        atomicReference.set(this.scheduler.submit(new Runnable() { // from class: io.cdap.cdap.common.async.KeyedExecutor.2
            @Override // java.lang.Runnable
            public void run() {
                if (atomicBoolean.get()) {
                    return;
                }
                try {
                    long executeOnce = repeatedTask.executeOnce();
                    if (executeOnce < 0) {
                        completableFuture.complete(null);
                    } else {
                        if (!atomicBoolean.get()) {
                            atomicReference.set(KeyedExecutor.this.scheduler.schedule(this, executeOnce, TimeUnit.MILLISECONDS));
                        }
                    }
                } catch (Exception e) {
                    completableFuture.completeExceptionally(e);
                }
            }
        }));
        completableFuture.handle((r7, th) -> {
            return Boolean.valueOf(this.futures.remove(k, completableFuture));
        });
        return completableFuture;
    }

    public Optional<Future<Void>> getFuture(K k) {
        return Optional.ofNullable(this.futures.get(k));
    }

    public void shutdown() {
        this.scheduler.shutdown();
    }

    public List<Runnable> shutdownNow() {
        return this.scheduler.shutdownNow();
    }

    public boolean awaitTermination(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.scheduler.awaitTermination(j, timeUnit);
    }
}
