package org.apache.beam.runners.spark.util;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.apache.beam.runners.spark.repackaged.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.runners.spark.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.runners.spark.repackaged.com.google.common.cache.CacheBuilder;
import org.apache.beam.runners.spark.repackaged.com.google.common.cache.CacheLoader;
import org.apache.beam.runners.spark.repackaged.com.google.common.cache.LoadingCache;
import org.apache.beam.runners.spark.repackaged.com.google.common.collect.Maps;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.spark.SparkEnv;
import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.BlockResult;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaStreamingListener;
import org.apache.spark.streaming.api.java.JavaStreamingListenerBatchCompleted;
import org.joda.time.Instant;
import scala.Option;

/* loaded from: input_file:org/apache/beam/runners/spark/util/GlobalWatermarkHolder.class */
public class GlobalWatermarkHolder {
    private static final Map<Integer, Queue<SparkWatermarks>> sourceTimes = new HashMap();
    private static final BlockId WATERMARKS_BLOCK_ID = BlockId.apply("broadcast_0WATERMARKS");
    private static volatile Map<Integer, SparkWatermarks> driverWatermarks = null;
    private static volatile LoadingCache<String, Map<Integer, SparkWatermarks>> watermarkCache = null;

    /* loaded from: input_file:org/apache/beam/runners/spark/util/GlobalWatermarkHolder$SparkWatermarks.class */
    public static class SparkWatermarks implements Serializable {
        private final Instant lowWatermark;
        private final Instant highWatermark;
        private final Instant synchronizedProcessingTime;

        @VisibleForTesting
        public SparkWatermarks(Instant instant, Instant instant2, Instant instant3) {
            this.lowWatermark = instant;
            this.highWatermark = instant2;
            this.synchronizedProcessingTime = instant3;
        }

        public Instant getLowWatermark() {
            return this.lowWatermark;
        }

        public Instant getHighWatermark() {
            return this.highWatermark;
        }

        public Instant getSynchronizedProcessingTime() {
            return this.synchronizedProcessingTime;
        }

        public String toString() {
            return "SparkWatermarks{lowWatermark=" + this.lowWatermark + ", highWatermark=" + this.highWatermark + ", synchronizedProcessingTime=" + this.synchronizedProcessingTime + '}';
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/util/GlobalWatermarkHolder$WatermarksListener.class */
    public static class WatermarksListener extends JavaStreamingListener {
        public void onBatchCompleted(JavaStreamingListenerBatchCompleted javaStreamingListenerBatchCompleted) {
            GlobalWatermarkHolder.advance();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/util/GlobalWatermarkHolder$WatermarksLoader.class */
    public static class WatermarksLoader extends CacheLoader<String, Map<Integer, SparkWatermarks>> {
        private WatermarksLoader() {
        }

        @Override // org.apache.beam.runners.spark.repackaged.com.google.common.cache.CacheLoader
        public Map<Integer, SparkWatermarks> load(@Nonnull String str) throws Exception {
            Option remote = SparkEnv.get().blockManager().getRemote(GlobalWatermarkHolder.WATERMARKS_BLOCK_ID);
            return remote.isDefined() ? (Map) ((BlockResult) remote.get()).data().next() : Maps.newHashMap();
        }
    }

    public static void add(int i, SparkWatermarks sparkWatermarks) {
        Queue<SparkWatermarks> queue = sourceTimes.get(Integer.valueOf(i));
        if (queue == null) {
            queue = new ConcurrentLinkedQueue();
        }
        queue.offer(sparkWatermarks);
        sourceTimes.put(Integer.valueOf(i), queue);
    }

    @VisibleForTesting
    public static void addAll(Map<Integer, Queue<SparkWatermarks>> map) {
        for (Map.Entry<Integer, Queue<SparkWatermarks>> entry : map.entrySet()) {
            int intValue = entry.getKey().intValue();
            Queue<SparkWatermarks> value = entry.getValue();
            while (!value.isEmpty()) {
                add(intValue, value.poll());
            }
        }
    }

    public static Map<Integer, SparkWatermarks> get(Long l) {
        if (driverWatermarks != null) {
            return driverWatermarks;
        }
        if (watermarkCache == null) {
            initWatermarkCache(l);
        }
        try {
            return watermarkCache.get("SINGLETON");
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private static synchronized void initWatermarkCache(Long l) {
        if (watermarkCache == null) {
            watermarkCache = CacheBuilder.newBuilder().expireAfterWrite(l.longValue() / 2, TimeUnit.MILLISECONDS).build(new WatermarksLoader());
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v87, types: [java.util.Map] */
    public static void advance() {
        HashMap newHashMap;
        synchronized (GlobalWatermarkHolder.class) {
            BlockManager blockManager = SparkEnv.get().blockManager();
            if (sourceTimes.isEmpty()) {
                return;
            }
            HashMap hashMap = new HashMap();
            for (Map.Entry<Integer, Queue<SparkWatermarks>> entry : sourceTimes.entrySet()) {
                if (!entry.getValue().isEmpty()) {
                    Integer key = entry.getKey();
                    Queue<SparkWatermarks> value = entry.getValue();
                    Instant instant = BoundedWindow.TIMESTAMP_MIN_VALUE;
                    Instant instant2 = BoundedWindow.TIMESTAMP_MIN_VALUE;
                    Instant instant3 = BoundedWindow.TIMESTAMP_MIN_VALUE;
                    Option remote = blockManager.getRemote(WATERMARKS_BLOCK_ID);
                    if (remote.isDefined()) {
                        newHashMap = (Map) ((BlockResult) remote.get()).data().next();
                    } else {
                        newHashMap = Maps.newHashMap();
                        blockManager.putSingle(WATERMARKS_BLOCK_ID, newHashMap, StorageLevel.MEMORY_ONLY(), true);
                    }
                    if (newHashMap.containsKey(key)) {
                        SparkWatermarks sparkWatermarks = (SparkWatermarks) newHashMap.get(key);
                        instant = sparkWatermarks.getLowWatermark();
                        instant2 = sparkWatermarks.getHighWatermark();
                        instant3 = sparkWatermarks.getSynchronizedProcessingTime();
                    }
                    SparkWatermarks poll = value.poll();
                    Instant lowWatermark = poll.getLowWatermark().isAfter(instant) ? poll.getLowWatermark() : instant;
                    Instant highWatermark = poll.getHighWatermark().isAfter(instant2) ? poll.getHighWatermark() : instant2;
                    Instant synchronizedProcessingTime = poll.getSynchronizedProcessingTime();
                    Preconditions.checkState(!lowWatermark.isAfter(highWatermark), String.format("Low watermark %s cannot be later then high watermark %s", lowWatermark, highWatermark));
                    Preconditions.checkState(synchronizedProcessingTime.isAfter(instant3), "Synchronized processing time must advance.");
                    hashMap.put(key, new SparkWatermarks(lowWatermark, highWatermark, synchronizedProcessingTime));
                }
            }
            if (!hashMap.isEmpty()) {
                driverWatermarks = hashMap;
                blockManager.removeBlock(WATERMARKS_BLOCK_ID, true);
                blockManager.putSingle(WATERMARKS_BLOCK_ID, hashMap, StorageLevel.MEMORY_ONLY(), true);
            }
        }
    }

    @VisibleForTesting
    public static synchronized void clear() {
        sourceTimes.clear();
        driverWatermarks = null;
        SparkEnv sparkEnv = SparkEnv.get();
        if (sparkEnv != null) {
            sparkEnv.blockManager().removeBlock(WATERMARKS_BLOCK_ID, true);
        }
    }
}
