package org.apache.samza.storage;

import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.samza.SamzaException;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.Clock;
import org.apache.samza.util.FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/storage/TaskSideInputStorageManager.class */
public class TaskSideInputStorageManager {
    private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputStorageManager.class);
    private static final long STORE_DELETE_RETENTION_MS = TimeUnit.DAYS.toMillis(1);
    private final Clock clock;
    private final Map<String, StorageEngine> stores;
    private final File storeBaseDir;
    private final Map<String, Set<SystemStreamPartition>> storeToSSps;
    private final TaskName taskName;
    private final TaskMode taskMode;
    private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();

    public TaskSideInputStorageManager(TaskName taskName, TaskMode taskMode, File file, Map<String, StorageEngine> map, Map<String, Set<SystemStreamPartition>> map2, Clock clock) {
        validateStoreConfiguration(map);
        this.clock = clock;
        this.stores = map;
        this.storeBaseDir = file;
        this.storeToSSps = map2;
        this.taskName = taskName;
        this.taskMode = taskMode;
    }

    public TaskName getTaskName() {
        return this.taskName;
    }

    public void init() {
        LOG.info("Initializing side input stores.");
        initializeStoreDirectories();
    }

    public void flush(Map<SystemStreamPartition, String> map) {
        LOG.info("Flushing the side input stores.");
        this.stores.values().forEach((v0) -> {
            v0.flush();
        });
        writeFileOffsets(map);
    }

    public void stop(Map<SystemStreamPartition, String> map) {
        LOG.info("Stopping the side input stores.");
        this.stores.values().forEach((v0) -> {
            v0.stop();
        });
        writeFileOffsets(map);
    }

    public StorageEngine getStore(String str) {
        return this.stores.get(str);
    }

    private void initializeStoreDirectories() {
        LOG.info("Initializing side input store directories.");
        this.stores.keySet().forEach(str -> {
            File storeLocation = getStoreLocation(str);
            String path = storeLocation.toPath().toString();
            if (!isValidSideInputStore(str, storeLocation)) {
                LOG.info("Cleaning up the store directory at {} for {}", path, str);
                new FileUtil().rm(storeLocation);
            }
            if (!isPersistedStore(str) || storeLocation.exists()) {
                return;
            }
            LOG.info("Creating {} as the store directory for the side input store {}", path, str);
            storeLocation.mkdirs();
        });
    }

    public void writeFileOffsets(Map<SystemStreamPartition, String> map) {
        this.storeToSSps.entrySet().stream().filter(entry -> {
            return isPersistedStore((String) entry.getKey());
        }).forEach(entry2 -> {
            String str = (String) entry2.getKey();
            Stream stream = ((Set) entry2.getValue()).stream();
            map.getClass();
            Stream filter = stream.filter((v1) -> {
                return r1.containsKey(v1);
            });
            Function identity = Function.identity();
            map.getClass();
            try {
                this.storageManagerUtil.writeOffsetFile(this.storageManagerUtil.getTaskStoreDir(this.storeBaseDir, str, this.taskName, this.taskMode), (Map) filter.collect(Collectors.toMap(identity, (v1) -> {
                    return r2.get(v1);
                })), true);
            } catch (Exception e) {
                throw new SamzaException("Failed to write offset file for side input store: " + str, e);
            }
        });
    }

    public Map<SystemStreamPartition, String> getFileOffsets() {
        LOG.info("Loading initial offsets from the file for side input stores.");
        HashMap hashMap = new HashMap();
        this.stores.keySet().forEach(str -> {
            LOG.debug("Reading local offsets for store: {}", str);
            File storeLocation = getStoreLocation(str);
            if (isValidSideInputStore(str, storeLocation)) {
                try {
                    hashMap.putAll(this.storageManagerUtil.readOffsetFile(storeLocation, this.storeToSSps.get(str), true));
                } catch (Exception e) {
                    LOG.warn("Failed to load the offset file for side input store:" + str, e);
                }
            }
        });
        return hashMap;
    }

    @VisibleForTesting
    File getStoreLocation(String str) {
        return this.storageManagerUtil.getTaskStoreDir(this.storeBaseDir, str, this.taskName, this.taskMode);
    }

    private boolean isValidSideInputStore(String str, File file) {
        return isPersistedStore(str) && !this.storageManagerUtil.isStaleStore(file, STORE_DELETE_RETENTION_MS, this.clock.currentTimeMillis(), true) && this.storageManagerUtil.isOffsetFileValid(file, this.storeToSSps.get(str), true);
    }

    private boolean isPersistedStore(String str) {
        return ((Boolean) Optional.ofNullable(this.stores.get(str)).map((v0) -> {
            return v0.getStoreProperties();
        }).map((v0) -> {
            return v0.isPersistedToDisk();
        }).orElse(false)).booleanValue();
    }

    private void validateStoreConfiguration(Map<String, StorageEngine> map) {
        map.forEach((str, storageEngine) -> {
            if (storageEngine.getStoreProperties().isLoggedStore()) {
                throw new SamzaException(String.format("Cannot configure both side inputs and a changelog for store: %s.", str));
            }
        });
    }
}
