package org.apache.beam.runners.direct;

import java.io.Serializable;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ComparisonChain;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.Ordering;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Instant;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/WatermarkCallbackExecutor.class */
public class WatermarkCallbackExecutor {
    private final ConcurrentMap<AppliedPTransform<?, ?, ?>, PriorityQueue<WatermarkCallback>> callbacks = new ConcurrentHashMap();
    private final Executor executor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/WatermarkCallbackExecutor$CallbackOrdering.class */
    public static class CallbackOrdering extends Ordering<WatermarkCallback> implements Serializable {
        private CallbackOrdering() {
        }

        @Override // org.apache.beam.runners.direct.repackaged.com.google.common.collect.Ordering, java.util.Comparator
        public int compare(WatermarkCallback watermarkCallback, WatermarkCallback watermarkCallback2) {
            return ComparisonChain.start().compare((Comparable<?>) watermarkCallback.fireAfter, (Comparable<?>) watermarkCallback2.fireAfter).compare(watermarkCallback.callback, watermarkCallback2.callback, Ordering.arbitrary()).result();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/WatermarkCallbackExecutor$WatermarkCallback.class */
    public static class WatermarkCallback {
        private final Instant fireAfter;
        private final Runnable callback;

        public static <W extends BoundedWindow> WatermarkCallback onGuaranteedFiring(BoundedWindow boundedWindow, WindowingStrategy<?, W> windowingStrategy, Runnable runnable) {
            return new WatermarkCallback(windowingStrategy.getTrigger().getWatermarkThatGuaranteesFiring(boundedWindow), runnable);
        }

        public static <W extends BoundedWindow> WatermarkCallback afterWindowExpiration(BoundedWindow boundedWindow, WindowingStrategy<?, W> windowingStrategy, Runnable runnable) {
            return new WatermarkCallback(boundedWindow.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).plus(1L), runnable);
        }

        private WatermarkCallback(Instant instant, Runnable runnable) {
            this.fireAfter = instant;
            this.callback = runnable;
        }

        public boolean shouldFire(Instant instant) {
            return instant.isAfter(this.fireAfter) || instant.equals(BoundedWindow.TIMESTAMP_MAX_VALUE);
        }

        public Runnable getCallback() {
            return this.callback;
        }
    }

    public static WatermarkCallbackExecutor create(Executor executor) {
        return new WatermarkCallbackExecutor(executor);
    }

    private WatermarkCallbackExecutor(Executor executor) {
        this.executor = executor;
    }

    public void callOnGuaranteedFiring(AppliedPTransform<?, ?, ?> appliedPTransform, BoundedWindow boundedWindow, WindowingStrategy<?, ?> windowingStrategy, Runnable runnable) {
        WatermarkCallback onGuaranteedFiring = WatermarkCallback.onGuaranteedFiring(boundedWindow, windowingStrategy, runnable);
        PriorityQueue<WatermarkCallback> priorityQueue = this.callbacks.get(appliedPTransform);
        if (priorityQueue == null) {
            priorityQueue = new PriorityQueue<>(11, new CallbackOrdering());
            if (this.callbacks.putIfAbsent(appliedPTransform, priorityQueue) != null) {
                priorityQueue = this.callbacks.get(appliedPTransform);
            }
        }
        synchronized (priorityQueue) {
            priorityQueue.offer(onGuaranteedFiring);
        }
    }

    public void callOnWindowExpiration(AppliedPTransform<?, ?, ?> appliedPTransform, BoundedWindow boundedWindow, WindowingStrategy<?, ?> windowingStrategy, Runnable runnable) {
        WatermarkCallback afterWindowExpiration = WatermarkCallback.afterWindowExpiration(boundedWindow, windowingStrategy, runnable);
        PriorityQueue<WatermarkCallback> priorityQueue = this.callbacks.get(appliedPTransform);
        if (priorityQueue == null) {
            priorityQueue = new PriorityQueue<>(11, new CallbackOrdering());
            if (this.callbacks.putIfAbsent(appliedPTransform, priorityQueue) != null) {
                priorityQueue = this.callbacks.get(appliedPTransform);
            }
        }
        synchronized (priorityQueue) {
            priorityQueue.offer(afterWindowExpiration);
        }
    }

    public void fireForWatermark(AppliedPTransform<?, ?, ?> appliedPTransform, Instant instant) {
        PriorityQueue<WatermarkCallback> priorityQueue = this.callbacks.get(appliedPTransform);
        if (priorityQueue == null) {
            return;
        }
        synchronized (priorityQueue) {
            while (!priorityQueue.isEmpty() && priorityQueue.peek().shouldFire(instant)) {
                this.executor.execute(priorityQueue.poll().getCallback());
            }
        }
    }
}
