package org.apache.samza.operators.impl;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.WatermarkMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/samza/operators/impl/WatermarkStates.class */
public class WatermarkStates {
    private static final Logger LOG = LoggerFactory.getLogger(WatermarkStates.class);
    public static final long WATERMARK_NOT_EXIST = -1;
    private final Map<SystemStreamPartition, WatermarkState> watermarkStates;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/samza/operators/impl/WatermarkStates$WatermarkState.class */
    public static final class WatermarkState {
        private final int expectedTotal;
        private final Map<String, Long> timestamps = new HashMap();
        private volatile long watermarkTime = -1;

        WatermarkState(int i) {
            this.expectedTotal = i;
        }

        synchronized void update(long j, String str) {
            if (str != null) {
                Long l = this.timestamps.get(str);
                if (l == null || l.longValue() <= j) {
                    this.timestamps.put(str, Long.valueOf(j));
                } else {
                    WatermarkStates.LOG.warn(String.format("Incoming watermark %s is smaller than existing watermark %s for upstream task %s", Long.valueOf(j), l, str));
                }
            }
            if (this.timestamps.size() == this.expectedTotal) {
                this.watermarkTime = this.timestamps.values().stream().min((v0, v1) -> {
                    return Long.compare(v0, v1);
                }).orElse(Long.valueOf(j)).longValue();
            }
        }

        long getWatermarkTime() {
            return this.watermarkTime;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public WatermarkStates(Set<SystemStreamPartition> set, Map<SystemStream, Integer> map) {
        HashMap hashMap = new HashMap();
        set.forEach(systemStreamPartition -> {
            hashMap.put(systemStreamPartition, new WatermarkState(((Integer) map.getOrDefault(systemStreamPartition.getSystemStream(), 0)).intValue()));
        });
        this.watermarkStates = Collections.unmodifiableMap(hashMap);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void update(WatermarkMessage watermarkMessage, SystemStreamPartition systemStreamPartition) {
        WatermarkState watermarkState = this.watermarkStates.get(systemStreamPartition);
        if (watermarkState != null) {
            watermarkState.update(watermarkMessage.getTimestamp(), watermarkMessage.getTaskName());
        } else {
            LOG.error("SSP {} doesn't have watermark states", systemStreamPartition);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getWatermark(SystemStream systemStream) {
        return ((Long) this.watermarkStates.entrySet().stream().filter(entry -> {
            return ((SystemStreamPartition) entry.getKey()).getSystemStream().equals(systemStream);
        }).map(entry2 -> {
            return Long.valueOf(((WatermarkState) entry2.getValue()).getWatermarkTime());
        }).min((v0, v1) -> {
            return Long.compare(v0, v1);
        }).orElse(-1L)).longValue();
    }

    long getWatermarkPerSSP(SystemStreamPartition systemStreamPartition) {
        return this.watermarkStates.get(systemStreamPartition).getWatermarkTime();
    }
}
