package org.apache.samza.storage;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileAttribute;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.CheckpointId;
import org.apache.samza.checkpoint.CheckpointV2;
import org.apache.samza.clustermanager.StandbyTaskUtil;
import org.apache.samza.config.Config;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.serializers.CheckpointV2Serde;
import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.Clock;
import org.apache.samza.util.FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/storage/StorageManagerUtil.class */
public class StorageManagerUtil {
    private static final Logger LOG;
    public static final String CHECKPOINT_FILE_NAME = "CHECKPOINT-V2";
    public static final String OFFSET_FILE_NAME_NEW = "OFFSET-v2";
    public static final String OFFSET_FILE_NAME_LEGACY = "OFFSET";
    public static final String SIDE_INPUT_OFFSET_FILE_NAME_LEGACY = "SIDE-INPUT-OFFSETS";
    private static final ObjectMapper OBJECT_MAPPER;
    private static final TypeReference<Map<SystemStreamPartition, String>> OFFSETS_TYPE_REFERENCE;
    private static final ObjectWriter SSP_OFFSET_OBJECT_WRITER;
    private static final String SST_FILE_SUFFIX = ".sst";
    private static final CheckpointV2Serde CHECKPOINT_V2_SERDE;
    static final /* synthetic */ boolean $assertionsDisabled;

    public String getStartingOffset(SystemStreamPartition systemStreamPartition, SystemAdmin systemAdmin, String str, String str2) {
        String str3 = str2;
        if (str != null) {
            String str4 = (String) systemAdmin.getOffsetsAfter(ImmutableMap.of(systemStreamPartition, str)).get(systemStreamPartition);
            if (systemAdmin.offsetComparator(str2, str4).intValue() <= 0) {
                str3 = str4;
            } else {
                LOG.warn("Local store offset {} is lower than the oldest offset {} of the source stream. The values between these offsets cannot be restored.", str4, str2);
            }
        }
        LOG.info("Starting offset for SystemStreamPartition {} is {}, fileOffset: {}, oldestOffset from source: {}", new Object[]{systemStreamPartition, str3, str, str2});
        return str3;
    }

    public boolean isStaleStore(File file, long j, long j2, boolean z) {
        boolean z2 = false;
        String path = file.toPath().toString();
        if (file.exists()) {
            File file2 = new File(file, OFFSET_FILE_NAME_NEW);
            File file3 = new File(file, OFFSET_FILE_NAME_LEGACY);
            File file4 = new File(file, SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
            File file5 = new File(file, CHECKPOINT_FILE_NAME);
            long lastModified = file5.exists() ? file5.lastModified() : file2.exists() ? file2.lastModified() : (z || !file3.exists()) ? (z && file4.exists()) ? file4.lastModified() : 0L : file3.lastModified();
            if (j2 - lastModified >= j) {
                LOG.info(String.format("Store: %s is stale since lastModifiedTime of offset file: %d, is older than store deleteRetentionMs: %d.", path, Long.valueOf(lastModified), Long.valueOf(j)));
                z2 = true;
            }
        } else {
            LOG.info("Storage partition directory: {} does not exist.", path);
        }
        return z2;
    }

    public boolean isLoggedStoreValid(String str, File file, Config config, Map<String, SystemStream> map, TaskModel taskModel, Clock clock, Map<String, StorageEngine> map2) {
        long changeLogDeleteRetentionInMs = new StorageConfig(config).getChangeLogDeleteRetentionInMs(str);
        if (map.containsKey(str)) {
            return map2.get(str).getStoreProperties().isPersistedToDisk() && isOffsetFileValid(file, Collections.singleton(new SystemStreamPartition(map.get(str), taskModel.getChangelogPartition())), false) && !isStaleStore(file, changeLogDeleteRetentionInMs, clock.currentTimeMillis(), false);
        }
        return false;
    }

    public boolean isOffsetFileValid(File file, Set<SystemStreamPartition> set, boolean z) {
        boolean z2 = false;
        if (file.exists()) {
            Map<SystemStreamPartition, String> readOffsetFile = readOffsetFile(file, set, z);
            if (readOffsetFile == null) {
                LOG.info("Offset file is invalid since it does not exist. Store directory: {}", file.toPath());
            } else if (readOffsetFile.isEmpty()) {
                LOG.info("Offset file is invalid since it is empty. Store directory: {}", file.toPath());
            } else if (readOffsetFile.keySet().equals(set)) {
                z2 = true;
            } else {
                LOG.info("Offset file is invalid since changelog or side input SSPs don't match. Store directory: {}. SSPs from offset-file: {} SSPs expected: {} ", new Object[]{file.toPath(), readOffsetFile.keySet(), set});
            }
        }
        return z2;
    }

    public void writeOffsetFile(File file, Map<SystemStreamPartition, String> map, boolean z) throws IOException {
        File file2 = new File(file, OFFSET_FILE_NAME_NEW);
        String writeValueAsString = SSP_OFFSET_OBJECT_WRITER.writeValueAsString(map);
        FileUtil fileUtil = new FileUtil();
        fileUtil.writeWithChecksum(file2, writeValueAsString);
        if (z) {
            fileUtil.writeWithChecksum(new File(file, SIDE_INPUT_OFFSET_FILE_NAME_LEGACY), SSP_OFFSET_OBJECT_WRITER.writeValueAsString(map));
        } else {
            fileUtil.writeWithChecksum(new File(file, OFFSET_FILE_NAME_LEGACY), map.entrySet().iterator().next().getValue());
        }
    }

    public void writeCheckpointV2File(File file, CheckpointV2 checkpointV2) {
        new FileUtil().writeWithChecksum(new File(file, CHECKPOINT_FILE_NAME), new String(CHECKPOINT_V2_SERDE.toBytes(checkpointV2)));
    }

    public void deleteOffsetFile(File file) {
        deleteOffsetFile(file, OFFSET_FILE_NAME_NEW);
        deleteOffsetFile(file, OFFSET_FILE_NAME_LEGACY);
    }

    private void deleteOffsetFile(File file, String str) {
        File file2 = new File(file, str);
        if (file2.exists()) {
            new FileUtil().rm(file2);
        }
    }

    public boolean storeExists(File file) {
        return file.exists() && file.list().length > 0;
    }

    public Map<SystemStreamPartition, String> readOffsetFile(File file, Set<SystemStreamPartition> set, boolean z) {
        File file2 = new File(file, OFFSET_FILE_NAME_NEW);
        File file3 = new File(file, OFFSET_FILE_NAME_LEGACY);
        File file4 = new File(file, SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
        return file2.exists() ? readOffsetFile(file, file2.getName(), set) : (z || !file3.exists()) ? (z && file4.exists()) ? readOffsetFile(file, file4.getName(), set) : new HashMap() : readOffsetFile(file, file3.getName(), set);
    }

    public CheckpointV2 readCheckpointV2File(File file) {
        File file2 = new File(file, CHECKPOINT_FILE_NAME);
        if (!file2.exists()) {
            return null;
        }
        return new CheckpointV2Serde().m165fromBytes(new FileUtil().readWithChecksum(file2).getBytes());
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v19, types: [java.util.Map] */
    /* JADX WARN: Type inference failed for: r0v25, types: [java.util.Map] */
    private Map<SystemStreamPartition, String> readOffsetFile(File file, String str, Set<SystemStreamPartition> set) {
        HashMap hashMap = new HashMap();
        String str2 = null;
        File file2 = new File(file, str);
        String path = file.getPath();
        if (file2.exists()) {
            LOG.debug("Found offset file in storage partition directory: {}", path);
            try {
                str2 = new FileUtil().readWithChecksum(file2);
                hashMap = (Map) OBJECT_MAPPER.readValue(str2, OFFSETS_TYPE_REFERENCE);
            } catch (Exception e) {
                LOG.warn("Failed to read offset file in storage partition directory: {}", path, e);
            } catch (JsonParseException | JsonMappingException e2) {
                LOG.info("Exception in json-parsing offset file {} {}, reading as string offset-value", file.toPath(), str);
                String str3 = str2;
                hashMap = set.size() == 1 ? (Map) set.stream().collect(Collectors.toMap(systemStreamPartition -> {
                    return systemStreamPartition;
                }, systemStreamPartition2 -> {
                    return str3;
                })) : hashMap;
            }
        } else {
            LOG.info("No offset file found in storage partition directory: {}", path);
        }
        return hashMap;
    }

    public File getTaskStoreDir(File file, String str, TaskName taskName, TaskMode taskMode) {
        TaskName taskName2 = taskName;
        if (taskMode.equals(TaskMode.Standby)) {
            taskName2 = StandbyTaskUtil.getActiveTaskName(taskName);
        }
        return new File(file, (str + File.separator + taskName2.toString()).replace(' ', '_'));
    }

    public List<File> getTaskStoreCheckpointDirs(File file, String str, TaskName taskName, TaskMode taskMode) {
        try {
            File file2 = new File(file, str);
            String name = getTaskStoreDir(file, str, taskName, taskMode).getName();
            return file2.exists() ? (List) Files.list(file2.toPath()).map((v0) -> {
                return v0.toFile();
            }).filter(file3 -> {
                return file3.getName().contains(name + "-");
            }).collect(Collectors.toList()) : Collections.emptyList();
        } catch (IOException e) {
            throw new SamzaException(String.format("Error finding checkpoint dirs for task: %s mode: %s store: %s in dir: %s", taskName, taskMode, str, file), e);
        }
    }

    public String getStoreCheckpointDir(File file, CheckpointId checkpointId) {
        return file.getPath() + "-" + checkpointId.serialize();
    }

    public void restoreCheckpointFiles(File file, File file2) {
        if (!$assertionsDisabled && file2.exists()) {
            throw new AssertionError();
        }
        try {
            Files.createDirectory(file2.toPath(), new FileAttribute[0]);
            for (File file3 : file.listFiles()) {
                String name = file3.getName();
                File file4 = new File(file2, name);
                if (name.endsWith(SST_FILE_SUFFIX)) {
                    Files.createLink(file4.toPath(), file3.toPath());
                } else {
                    Files.copy(file3.toPath(), file4.toPath(), StandardCopyOption.REPLACE_EXISTING);
                }
            }
        } catch (Exception e) {
            throw new SamzaException(String.format("Failed to restore store from checkpoint dir: %s to current dir: %s", file, file2), e);
        }
    }

    static {
        $assertionsDisabled = !StorageManagerUtil.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(StorageManagerUtil.class);
        OBJECT_MAPPER = SamzaObjectMapper.getObjectMapper();
        OFFSETS_TYPE_REFERENCE = new TypeReference<Map<SystemStreamPartition, String>>() { // from class: org.apache.samza.storage.StorageManagerUtil.1
        };
        SSP_OFFSET_OBJECT_WRITER = OBJECT_MAPPER.writerFor(OFFSETS_TYPE_REFERENCE);
        CHECKPOINT_V2_SERDE = new CheckpointV2Serde();
    }
}
