package org.apache.samza.startpoint;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.container.TaskName;
import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.system.SystemStreamPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/startpoint/StartpointManager.class */
public class StartpointManager {
    private static final Integer VERSION = 1;
    public static final String NAMESPACE = "samza-startpoint-v" + VERSION;
    static final Duration DEFAULT_EXPIRATION_DURATION = Duration.ofHours(12);
    private static final Logger LOG = LoggerFactory.getLogger(StartpointManager.class);
    private static final String NAMESPACE_FAN_OUT = NAMESPACE + "-fan-out";
    private final NamespaceAwareCoordinatorStreamStore fanOutStore;
    private final NamespaceAwareCoordinatorStreamStore readWriteStore;
    private final ObjectMapper objectMapper = StartpointObjectMapper.getObjectMapper();
    private boolean stopped = true;

    public StartpointManager(MetadataStore metadataStore) {
        Preconditions.checkNotNull(metadataStore, "MetadataStore cannot be null");
        this.readWriteStore = new NamespaceAwareCoordinatorStreamStore(metadataStore, NAMESPACE);
        this.fanOutStore = new NamespaceAwareCoordinatorStreamStore(metadataStore, NAMESPACE_FAN_OUT);
        LOG.info("Startpoints are written to namespace: {} and fanned out to namespace: {} in the metadata store of type: {}", new Object[]{NAMESPACE, NAMESPACE_FAN_OUT, metadataStore.getClass().getCanonicalName()});
    }

    public void start() {
        if (!this.stopped) {
            LOG.warn("already started");
            return;
        }
        LOG.info("starting");
        this.readWriteStore.init();
        this.fanOutStore.init();
        this.stopped = false;
    }

    public void stop() {
        if (this.stopped) {
            LOG.warn("already stopped");
            return;
        }
        LOG.info("stopping");
        this.readWriteStore.close();
        this.fanOutStore.close();
        this.stopped = true;
    }

    public void writeStartpoint(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
        writeStartpoint(systemStreamPartition, null, startpoint);
    }

    public void writeStartpoint(SystemStreamPartition systemStreamPartition, TaskName taskName, Startpoint startpoint) {
        Preconditions.checkState(!this.stopped, "Underlying metadata store not available");
        Preconditions.checkNotNull(systemStreamPartition, "SystemStreamPartition cannot be null");
        Preconditions.checkNotNull(startpoint, "Startpoint cannot be null");
        try {
            this.readWriteStore.put(toReadWriteStoreKey(systemStreamPartition, taskName), this.objectMapper.writeValueAsBytes(startpoint));
            this.readWriteStore.flush();
        } catch (Exception e) {
            throw new SamzaException(String.format("Startpoint for SSP: %s and task: %s may not have been written to the metadata store.", systemStreamPartition, taskName), e);
        }
    }

    @VisibleForTesting
    public Optional<Startpoint> readStartpoint(SystemStreamPartition systemStreamPartition) {
        return readStartpoint(this.readWriteStore.all(), systemStreamPartition, null);
    }

    @VisibleForTesting
    public Optional<Startpoint> readStartpoint(SystemStreamPartition systemStreamPartition, TaskName taskName) {
        return readStartpoint(this.readWriteStore.all(), systemStreamPartition, taskName);
    }

    public Optional<Startpoint> readStartpoint(Map<String, byte[]> map, SystemStreamPartition systemStreamPartition, TaskName taskName) {
        Preconditions.checkState(!this.stopped, "Underlying metadata store not available");
        Preconditions.checkNotNull(systemStreamPartition, "SystemStreamPartition cannot be null");
        byte[] bArr = map.get(toReadWriteStoreKey(systemStreamPartition, taskName));
        if (ArrayUtils.isNotEmpty(bArr)) {
            try {
                Startpoint startpoint = (Startpoint) this.objectMapper.readValue(bArr, Startpoint.class);
                if (Instant.now().minus((TemporalAmount) DEFAULT_EXPIRATION_DURATION).isBefore(Instant.ofEpochMilli(startpoint.getCreationTimestamp()))) {
                    return Optional.of(startpoint);
                }
                LOG.warn("Creation timestamp: {} of startpoint: {} has crossed the expiration duration: {}. Ignoring it", new Object[]{Long.valueOf(startpoint.getCreationTimestamp()), startpoint, DEFAULT_EXPIRATION_DURATION});
            } catch (IOException e) {
                throw new SamzaException(e);
            }
        }
        return Optional.empty();
    }

    public void deleteStartpoint(SystemStreamPartition systemStreamPartition) {
        deleteStartpoint(systemStreamPartition, null);
    }

    public void deleteStartpoint(SystemStreamPartition systemStreamPartition, TaskName taskName) {
        Preconditions.checkState(!this.stopped, "Underlying metadata store not available");
        Preconditions.checkNotNull(systemStreamPartition, "SystemStreamPartition cannot be null");
        this.readWriteStore.delete(toReadWriteStoreKey(systemStreamPartition, taskName));
        this.readWriteStore.flush();
    }

    public void deleteAllStartpoints() {
        Iterator<String> it = this.readWriteStore.all().keySet().iterator();
        while (it.hasNext()) {
            this.readWriteStore.delete(it.next());
        }
        this.readWriteStore.flush();
    }

    public Map<TaskName, Map<SystemStreamPartition, Startpoint>> fanOut(Map<TaskName, Set<SystemStreamPartition>> map) throws IOException {
        Preconditions.checkState(!this.stopped, "Underlying metadata store not available");
        Preconditions.checkArgument(MapUtils.isNotEmpty(map), "taskToSSPs cannot be null or empty");
        Instant now = Instant.now();
        HashMultimap create = HashMultimap.create();
        HashMap hashMap = new HashMap();
        Map<String, byte[]> all = this.readWriteStore.all();
        for (TaskName taskName : map.keySet()) {
            Set<SystemStreamPartition> set = map.get(taskName);
            if (CollectionUtils.isEmpty(set)) {
                LOG.warn("No SSPs are mapped to taskName: {}", taskName.getTaskName());
            } else {
                for (SystemStreamPartition systemStreamPartition : set) {
                    Optional<Startpoint> readStartpoint = readStartpoint(all, systemStreamPartition, null);
                    readStartpoint.ifPresent(startpoint -> {
                        create.put(systemStreamPartition, (Object) null);
                    });
                    Optional<Startpoint> readStartpoint2 = readStartpoint(all, systemStreamPartition, taskName);
                    readStartpoint2.ifPresent(startpoint2 -> {
                        create.put(systemStreamPartition, taskName);
                    });
                    Optional<Startpoint> resolveStartpointPrecendence = resolveStartpointPrecendence(readStartpoint, readStartpoint2);
                    if (resolveStartpointPrecendence.isPresent()) {
                        hashMap.putIfAbsent(taskName, new StartpointFanOutPerTask(now));
                        ((StartpointFanOutPerTask) hashMap.get(taskName)).getFanOuts().put(systemStreamPartition, resolveStartpointPrecendence.get());
                    }
                }
            }
        }
        if (hashMap.isEmpty()) {
            LOG.debug("No fan outs created.");
            return ImmutableMap.of();
        }
        LOG.info("Fanning out to {} tasks", Integer.valueOf(hashMap.size()));
        for (TaskName taskName2 : hashMap.keySet()) {
            this.fanOutStore.put(toFanOutStoreKey(taskName2), this.objectMapper.writeValueAsBytes((StartpointFanOutPerTask) hashMap.get(taskName2)));
        }
        this.fanOutStore.flush();
        for (SystemStreamPartition systemStreamPartition2 : create.keySet()) {
            for (TaskName taskName3 : create.get(systemStreamPartition2)) {
                if (taskName3 != null) {
                    deleteStartpoint(systemStreamPartition2, taskName3);
                } else {
                    deleteStartpoint(systemStreamPartition2);
                }
            }
        }
        return ImmutableMap.copyOf((Map) hashMap.entrySet().stream().collect(Collectors.toMap(entry -> {
            return (TaskName) entry.getKey();
        }, entry2 -> {
            return ((StartpointFanOutPerTask) entry2.getValue()).getFanOuts();
        })));
    }

    public Map<SystemStreamPartition, Startpoint> getFanOutForTask(TaskName taskName) throws IOException {
        return (Map) getStartpointFanOutPerTask(taskName).map(startpointFanOutPerTask -> {
            return ImmutableMap.copyOf(startpointFanOutPerTask.getFanOuts());
        }).orElse(ImmutableMap.of());
    }

    private Optional<StartpointFanOutPerTask> getStartpointFanOutPerTask(TaskName taskName) throws IOException {
        Preconditions.checkState(!this.stopped, "Underlying metadata store not available");
        Preconditions.checkNotNull(taskName, "TaskName cannot be null");
        byte[] bArr = this.fanOutStore.get(toFanOutStoreKey(taskName));
        return ArrayUtils.isEmpty(bArr) ? Optional.empty() : Optional.of(this.objectMapper.readValue(bArr, StartpointFanOutPerTask.class));
    }

    public void removeFanOutForTaskSSPs(TaskName taskName, Set<SystemStreamPartition> set) {
        Preconditions.checkState(!this.stopped, "Underlying metadata store not available");
        Preconditions.checkNotNull(taskName, "TaskName cannot be null");
        if (set == null || set.isEmpty()) {
            return;
        }
        try {
            getStartpointFanOutPerTask(taskName).ifPresent(startpointFanOutPerTask -> {
                Map<SystemStreamPartition, Startpoint> fanOuts = startpointFanOutPerTask.getFanOuts();
                fanOuts.entrySet().removeIf(entry -> {
                    return set.contains(entry.getKey());
                });
                if (fanOuts.isEmpty()) {
                    removeFanOutForTask(taskName);
                    LOG.debug("Deleted the fanned out startpoints for the task {}", taskName);
                    return;
                }
                try {
                    this.fanOutStore.put(toFanOutStoreKey(taskName), this.objectMapper.writeValueAsBytes(startpointFanOutPerTask));
                    this.fanOutStore.flush();
                } catch (IOException e) {
                    LOG.error("Can't update the fanned out startpoints for task {}", taskName, e);
                    throw new SamzaException(e);
                }
            });
        } catch (IOException e) {
            LOG.error("Can't get the fanned out startpoints for task {}", taskName, e);
            throw new SamzaException(e);
        }
    }

    public void removeFanOutForTask(TaskName taskName) {
        Preconditions.checkState(!this.stopped, "Underlying metadata store not available");
        Preconditions.checkNotNull(taskName, "TaskName cannot be null");
        this.fanOutStore.delete(toFanOutStoreKey(taskName));
        this.fanOutStore.flush();
    }

    public void removeAllFanOuts() {
        Iterator<String> it = this.fanOutStore.all().keySet().iterator();
        while (it.hasNext()) {
            this.fanOutStore.delete(it.next());
        }
        this.fanOutStore.flush();
    }

    @VisibleForTesting
    MetadataStore getReadWriteStore() {
        return this.readWriteStore;
    }

    @VisibleForTesting
    MetadataStore getFanOutStore() {
        return this.fanOutStore;
    }

    @VisibleForTesting
    ObjectMapper getObjectMapper() {
        return this.objectMapper;
    }

    private static Optional<Startpoint> resolveStartpointPrecendence(Optional<Startpoint> optional, Optional<Startpoint> optional2) {
        return (optional.isPresent() && optional2.isPresent()) ? optional.get().getCreationTimestamp() > optional2.get().getCreationTimestamp() ? optional : optional2 : optional.isPresent() ? optional : optional2;
    }

    private static String toReadWriteStoreKey(SystemStreamPartition systemStreamPartition, TaskName taskName) {
        Preconditions.checkArgument(systemStreamPartition != null, "SystemStreamPartition should be defined");
        Preconditions.checkArgument(StringUtils.isNotBlank(systemStreamPartition.getSystem()), "System should be defined");
        Preconditions.checkArgument(StringUtils.isNotBlank(systemStreamPartition.getStream()), "Stream should be defined");
        Preconditions.checkArgument(systemStreamPartition.getPartition() != null, "Partition should be defined");
        String str = systemStreamPartition.getSystem() + "." + systemStreamPartition.getStream() + "." + String.valueOf(systemStreamPartition.getPartition().getPartitionId());
        if (taskName != null) {
            str = str + "." + taskName.getTaskName();
        }
        return str;
    }

    private static String toFanOutStoreKey(TaskName taskName) {
        Preconditions.checkArgument(taskName != null, "TaskName should be defined");
        Preconditions.checkArgument(StringUtils.isNotBlank(taskName.getTaskName()), "TaskName should not be blank");
        return taskName.getTaskName();
    }
}
