package org.apache.samza.zk;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.samza.config.JobConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/zk/ScheduleAfterDebounceTime.class */
public class ScheduleAfterDebounceTime {
    private static final Logger LOG = LoggerFactory.getLogger(ScheduleAfterDebounceTime.class);
    private static final String DEBOUNCE_THREAD_NAME_FORMAT = "Samza Debounce Thread-%s";
    private static final int TIMEOUT_MS = 10000;
    private Optional<ScheduledTaskCallback> scheduledTaskCallback;
    private final ScheduledExecutorService scheduledExecutorService;
    private final Map<String, ScheduledFuture> futureHandles = new ConcurrentHashMap();
    private volatile boolean isShuttingDown = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/samza/zk/ScheduleAfterDebounceTime$ScheduledTaskCallback.class */
    public interface ScheduledTaskCallback {
        void onError(Throwable th);
    }

    public ScheduleAfterDebounceTime(String str) {
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat(String.format(DEBOUNCE_THREAD_NAME_FORMAT, str)).setDaemon(true).build());
    }

    public void setScheduledTaskCallback(ScheduledTaskCallback scheduledTaskCallback) {
        this.scheduledTaskCallback = Optional.ofNullable(scheduledTaskCallback);
    }

    public synchronized void scheduleAfterDebounceTime(String str, long j, Runnable runnable) {
        if (this.isShuttingDown) {
            LOG.info("Scheduler is stopped. Not scheduling action: {} to run.", str);
            return;
        }
        tryCancelScheduledAction(str);
        ScheduledFuture<?> schedule = this.scheduledExecutorService.schedule(getScheduleableAction(str, runnable), j, TimeUnit.MILLISECONDS);
        LOG.info("Scheduled action: {} to run after: {} milliseconds.", str, Long.valueOf(j));
        this.futureHandles.put(str, schedule);
    }

    public synchronized void cancelAction(String str) {
        if (this.isShuttingDown) {
            return;
        }
        tryCancelScheduledAction(str);
    }

    public synchronized void stopScheduler() {
        if (this.isShuttingDown) {
            LOG.debug("Debounce timer shutdown is already in progress!");
            return;
        }
        this.isShuttingDown = true;
        LOG.info("Shutting down debounce timer!");
        this.scheduledExecutorService.shutdown();
        cancelAllScheduledActions();
    }

    public synchronized void cancelAllScheduledActions() {
        if (this.isShuttingDown) {
            return;
        }
        this.futureHandles.keySet().forEach(this::tryCancelScheduledAction);
        this.futureHandles.clear();
    }

    private void tryCancelScheduledAction(String str) {
        LOG.info("Trying to cancel the action: {}.", str);
        ScheduledFuture scheduledFuture = this.futureHandles.get(str);
        if (scheduledFuture == null || scheduledFuture.isDone()) {
            return;
        }
        LOG.info("Attempting to cancel the future of action: {}", str);
        if (!scheduledFuture.cancel(false)) {
            try {
                scheduledFuture.get(JobConfig.YARN_CONTAINER_HEARTBEAT_RETRY_SLEEP_DURATION_MS_DEFAULT, TimeUnit.MILLISECONDS);
            } catch (Exception e) {
                LOG.warn("Cancelling the future of action: {} failed.", str, e);
            }
        }
        this.futureHandles.remove(str);
    }

    private Runnable getScheduleableAction(String str, Runnable runnable) {
        return () -> {
            try {
                if (!this.isShuttingDown) {
                    runnable.run();
                    if (Thread.currentThread().isInterrupted()) {
                        LOG.warn("Action: {} is interrupted.", str);
                        doCleanUpOnTaskException(new InterruptedException());
                    } else {
                        LOG.info("Action: {} completed successfully.", str);
                    }
                }
            } catch (Throwable th) {
                LOG.error("Execution of action: {} failed.", str, th);
                doCleanUpOnTaskException(th);
            }
        };
    }

    private void doCleanUpOnTaskException(Throwable th) {
        stopScheduler();
        this.scheduledTaskCallback.ifPresent(scheduledTaskCallback -> {
            scheduledTaskCallback.onError(th);
        });
    }
}
