package org.apache.samza.operators.impl;

import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.samza.operators.triggers.Cancellable;
import org.apache.samza.util.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/operators/impl/TriggerScheduler.class */
public class TriggerScheduler<WK> {
    private static final Logger LOG = LoggerFactory.getLogger(TriggerScheduler.class);
    private final Queue<TriggerScheduler<WK>.TriggerCallbackState<WK>> pendingCallbacks = new PriorityBlockingQueue();
    private final Clock clock;

    /* loaded from: input_file:org/apache/samza/operators/impl/TriggerScheduler$TriggerCallbackState.class */
    private class TriggerCallbackState<WK> implements Comparable<TriggerScheduler<WK>.TriggerCallbackState<WK>>, Cancellable {
        private final TriggerKey<WK> triggerKey;
        private final Runnable callback;
        private final long scheduledTimeMs;

        private TriggerCallbackState(TriggerKey<WK> triggerKey, Runnable runnable, long j) {
            this.triggerKey = triggerKey;
            this.callback = runnable;
            this.scheduledTimeMs = j;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Runnable getCallback() {
            return this.callback;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public long getScheduledTimeMs() {
            return this.scheduledTimeMs;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public TriggerKey<WK> getTriggerKey() {
            return this.triggerKey;
        }

        @Override // java.lang.Comparable
        public int compareTo(TriggerScheduler<WK>.TriggerCallbackState<WK> triggerCallbackState) {
            return Long.compare(this.scheduledTimeMs, triggerCallbackState.scheduledTimeMs);
        }

        @Override // org.apache.samza.operators.triggers.Cancellable
        public boolean cancel() {
            TriggerScheduler.LOG.trace("Cancelled a callback: {} at {} for triggerKey {}", new Object[]{this.callback, Long.valueOf(this.scheduledTimeMs), this.triggerKey});
            return TriggerScheduler.this.pendingCallbacks.remove(this);
        }
    }

    public TriggerScheduler(Clock clock) {
        this.clock = clock;
    }

    public Cancellable scheduleCallback(Runnable runnable, long j, TriggerKey<WK> triggerKey) {
        TriggerScheduler<WK>.TriggerCallbackState<WK> triggerCallbackState = new TriggerCallbackState<>(triggerKey, runnable, j);
        this.pendingCallbacks.add(triggerCallbackState);
        LOG.trace("Scheduled a new callback: {} at {} for triggerKey {}", new Object[]{runnable, Long.valueOf(j), triggerKey});
        return triggerCallbackState;
    }

    public List<TriggerKey<WK>> runPendingCallbacks() {
        ArrayList arrayList = new ArrayList();
        long currentTimeMillis = this.clock.currentTimeMillis();
        while (true) {
            TriggerScheduler<WK>.TriggerCallbackState<WK> peek = this.pendingCallbacks.peek();
            if (peek == null || peek.getScheduledTimeMs() > currentTimeMillis) {
                break;
            }
            this.pendingCallbacks.remove();
            peek.getCallback().run();
            arrayList.add(peek.getTriggerKey());
        }
        return arrayList;
    }
}
