package org.apache.samza.storage;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ListMultimap;
import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.Checkpoint;
import org.apache.samza.checkpoint.CheckpointId;
import org.apache.samza.checkpoint.CheckpointV1;
import org.apache.samza.checkpoint.CheckpointV2;
import org.apache.samza.checkpoint.kafka.KafkaChangelogSSPOffset;
import org.apache.samza.checkpoint.kafka.KafkaStateCheckpointMarker;
import org.apache.samza.config.Config;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.JobContext;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.serializers.Serde;
import org.apache.samza.storage.StorageEngineFactory;
import org.apache.samza.system.ChangelogSSPIterator;
import org.apache.samza.system.SSPMetadataCache;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.util.Clock;
import org.apache.samza.util.FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/storage/TransactionalStateTaskRestoreManager.class */
public class TransactionalStateTaskRestoreManager implements TaskRestoreManager {
    private static final Logger LOG = LoggerFactory.getLogger(TransactionalStateTaskRestoreManager.class);
    private final TaskModel taskModel;
    private final Map<String, StorageEngine> storeEngines;
    private final Map<String, SystemStream> storeChangelogs;
    private final SystemAdmins systemAdmins;
    private final Map<String, SystemConsumer> storeConsumers;
    private final SSPMetadataCache sspMetadataCache;
    private final File loggedStoreBaseDirectory;
    private final File nonLoggedStoreBaseDirectory;
    private final Config config;
    private final Clock clock;
    private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
    private final FileUtil fileUtil = new FileUtil();
    private final ExecutorService restoreExecutor;
    private StoreActions storeActions;
    private Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> currentChangelogOffsets;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/samza/storage/TransactionalStateTaskRestoreManager$RestoreOffsets.class */
    public static class RestoreOffsets {
        final String startingOffset;
        final String endingOffset;

        RestoreOffsets(String str, String str2) {
            this.startingOffset = str;
            this.endingOffset = str2;
        }

        public String toString() {
            return String.format("startingOffset: %s, endingOffset: %s", this.startingOffset, this.endingOffset);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/samza/storage/TransactionalStateTaskRestoreManager$StoreActions.class */
    public static class StoreActions {
        final Map<String, File> storeDirsToRetain;
        final ListMultimap<String, File> storeDirsToDelete;
        final Map<String, RestoreOffsets> storesToRestore;

        StoreActions(Map<String, File> map, ListMultimap<String, File> listMultimap, Map<String, RestoreOffsets> map2) {
            this.storeDirsToRetain = map;
            this.storeDirsToDelete = listMultimap;
            this.storesToRestore = map2;
        }
    }

    public TransactionalStateTaskRestoreManager(Set<String> set, JobContext jobContext, ContainerContext containerContext, TaskModel taskModel, ExecutorService executorService, Map<String, SystemStream> map, Map<String, StorageEngine> map2, Map<String, StorageEngineFactory<Object, Object>> map3, Map<String, Serde<Object>> map4, SystemAdmins systemAdmins, Map<String, SystemConsumer> map5, MetricsRegistry metricsRegistry, MessageCollector messageCollector, SSPMetadataCache sSPMetadataCache, File file, File file2, Config config, Clock clock) {
        this.taskModel = taskModel;
        this.restoreExecutor = executorService;
        this.storeChangelogs = map;
        this.systemAdmins = systemAdmins;
        this.storeConsumers = map5;
        this.sspMetadataCache = sSPMetadataCache;
        this.loggedStoreBaseDirectory = file;
        this.nonLoggedStoreBaseDirectory = file2;
        this.config = config;
        this.clock = clock;
        this.storeEngines = createStoreEngines(set, jobContext, containerContext, map3, map4, metricsRegistry, messageCollector, map2);
    }

    public void init(Checkpoint checkpoint) {
        Map<String, KafkaStateCheckpointMarker> checkpointedChangelogOffsets = getCheckpointedChangelogOffsets(checkpoint);
        this.currentChangelogOffsets = getCurrentChangelogOffsets(this.taskModel, this.storeChangelogs, this.sspMetadataCache);
        this.storeActions = getStoreActions(this.taskModel, this.storeEngines, this.storeChangelogs, checkpointedChangelogOffsets, getCheckpointId(checkpoint), this.currentChangelogOffsets, this.systemAdmins, this.storageManagerUtil, this.loggedStoreBaseDirectory, this.nonLoggedStoreBaseDirectory, this.config, this.clock);
        setupStoreDirs(this.taskModel, this.storeEngines, this.storeActions, this.storageManagerUtil, this.fileUtil, this.loggedStoreBaseDirectory, this.nonLoggedStoreBaseDirectory);
        registerStartingOffsets(this.taskModel, this.storeActions, this.storeChangelogs, this.systemAdmins, this.storeConsumers, this.currentChangelogOffsets);
    }

    public CompletableFuture<Void> restore() {
        return CompletableFuture.runAsync(() -> {
            for (Map.Entry<String, RestoreOffsets> entry : this.storeActions.storesToRestore.entrySet()) {
                String key = entry.getKey();
                String str = entry.getValue().endingOffset;
                SystemStream systemStream = this.storeChangelogs.get(key);
                SystemAdmin systemAdmin = this.systemAdmins.getSystemAdmin(systemStream.getSystem());
                SystemConsumer systemConsumer = this.storeConsumers.get(key);
                SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemStream, this.taskModel.getChangelogPartition());
                ChangelogSSPIterator changelogSSPIterator = new ChangelogSSPIterator(systemConsumer, systemStreamPartition, str, systemAdmin, true, this.currentChangelogOffsets.get(systemStreamPartition).getNewestOffset());
                StorageEngine storageEngine = this.storeEngines.get(key);
                LOG.info("Restoring store: {} for task: {}", key, this.taskModel.getTaskName());
                try {
                    storageEngine.restore(changelogSSPIterator);
                } catch (InterruptedException e) {
                    throw new SamzaException(String.format("Interrupted while restoring store: %s for task: %s", key, this.taskModel.getTaskName().getTaskName()), e);
                }
            }
        }, this.restoreExecutor);
    }

    public void close() {
        TaskName taskName = this.taskModel.getTaskName();
        this.storeEngines.forEach((str, storageEngine) -> {
            if (storageEngine.getStoreProperties().isPersistedToDisk()) {
                storageEngine.stop();
            }
            LOG.info("Stopped persistent store: {} in task: {}", str, taskName);
        });
    }

    private Map<String, StorageEngine> createStoreEngines(Set<String> set, JobContext jobContext, ContainerContext containerContext, Map<String, StorageEngineFactory<Object, Object>> map, Map<String, Serde<Object>> map2, MetricsRegistry metricsRegistry, MessageCollector messageCollector, Map<String, StorageEngine> map3) {
        HashMap hashMap = new HashMap();
        hashMap.getClass();
        map3.forEach((v1, v2) -> {
            r1.put(v1, v2);
        });
        set.forEach(str -> {
            hashMap.put(str, ContainerStorageManager.createStore(str, this.storageManagerUtil.getTaskStoreDir(this.storeChangelogs.containsKey(str) ? this.loggedStoreBaseDirectory : this.nonLoggedStoreBaseDirectory, str, this.taskModel.getTaskName(), this.taskModel.getTaskMode()), this.taskModel, jobContext, containerContext, map, map2, metricsRegistry, messageCollector, StorageEngineFactory.StoreMode.BulkLoad, this.storeChangelogs, this.config));
        });
        return hashMap;
    }

    @VisibleForTesting
    static Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> getCurrentChangelogOffsets(TaskModel taskModel, Map<String, SystemStream> map, SSPMetadataCache sSPMetadataCache) {
        HashMap hashMap = new HashMap();
        Partition changelogPartition = taskModel.getChangelogPartition();
        Iterator<Map.Entry<String, SystemStream>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            SystemStream value = it.next().getValue();
            SystemStreamPartition systemStreamPartition = new SystemStreamPartition(value.getSystem(), value.getStream(), changelogPartition);
            hashMap.put(systemStreamPartition, sSPMetadataCache.getMetadata(systemStreamPartition));
        }
        LOG.info("Got current changelog offsets for taskName: {} as: {}", taskModel.getTaskName(), hashMap);
        return hashMap;
    }

    @VisibleForTesting
    static StoreActions getStoreActions(TaskModel taskModel, Map<String, StorageEngine> map, Map<String, SystemStream> map2, Map<String, KafkaStateCheckpointMarker> map3, CheckpointId checkpointId, Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> map4, SystemAdmins systemAdmins, StorageManagerUtil storageManagerUtil, File file, File file2, Config config, Clock clock) {
        TaskName taskName = taskModel.getTaskName();
        TaskMode taskMode = taskModel.getTaskMode();
        HashMap hashMap = new HashMap();
        ArrayListMultimap create = ArrayListMultimap.create();
        HashMap hashMap2 = new HashMap();
        map.forEach((str, storageEngine) -> {
            Optional of;
            Optional of2;
            String str;
            if (storageEngine.getStoreProperties().isPersistedToDisk() || storageEngine.getStoreProperties().isLoggedStore()) {
                if (storageEngine.getStoreProperties().isPersistedToDisk() && !storageEngine.getStoreProperties().isLoggedStore()) {
                    File taskStoreDir = storageManagerUtil.getTaskStoreDir(file2, str, taskName, taskMode);
                    LOG.info("Marking current directory: {} for store: {} in task: {} for deletion since it is not a logged store.", new Object[]{taskStoreDir, str, taskName});
                    create.put(str, taskStoreDir);
                    return;
                }
                SystemStreamPartition systemStreamPartition = new SystemStreamPartition((SystemStream) map2.get(str), taskModel.getChangelogPartition());
                SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(systemStreamPartition.getSystem());
                SystemStreamMetadata.SystemStreamPartitionMetadata systemStreamPartitionMetadata = (SystemStreamMetadata.SystemStreamPartitionMetadata) map4.get(systemStreamPartition);
                String oldestOffset = systemStreamPartitionMetadata.getOldestOffset();
                String newestOffset = systemStreamPartitionMetadata.getNewestOffset();
                String str2 = null;
                if (map3.containsKey(str) && StringUtils.isNotBlank(((KafkaStateCheckpointMarker) map3.get(str)).getChangelogOffset())) {
                    str2 = ((KafkaStateCheckpointMarker) map3.get(str)).getChangelogOffset();
                }
                long currentTimeMillis = checkpointId == null ? Long.MAX_VALUE : System.currentTimeMillis() - checkpointId.getMillis();
                if (storageEngine.getStoreProperties().isPersistedToDisk() && new StorageConfig(config).cleanLoggedStoreDirsOnStart(str)) {
                    File taskStoreDir2 = storageManagerUtil.getTaskStoreDir(file, str, taskName, taskMode);
                    LOG.info("Marking current directory: {} for store: {} in task: {} for deletion due to clean.on.container.start config.", new Object[]{taskStoreDir2, str, taskName});
                    create.put(str, taskStoreDir2);
                    storageManagerUtil.getTaskStoreCheckpointDirs(file, str, taskName, taskMode).forEach(file3 -> {
                        LOG.info("Marking checkpoint directory: {} for store: {} in task: {} for deletion due to clean.on.container.start config.", new Object[]{file3, str, taskName});
                        create.put(str, file3);
                    });
                    LOG.info("Marking restore offsets for store: {} in task: {} to {}, {} ", new Object[]{str, taskName, oldestOffset, str2});
                    hashMap2.put(str, new RestoreOffsets(oldestOffset, str2));
                    return;
                }
                if (storageEngine.getStoreProperties().isPersistedToDisk()) {
                    of = Optional.of(storageManagerUtil.getTaskStoreDir(file, str, taskName, taskMode));
                    of2 = Optional.of(storageManagerUtil.getTaskStoreCheckpointDirs(file, str, taskName, taskMode));
                } else {
                    of = Optional.empty();
                    of2 = Optional.empty();
                }
                LOG.info("For store: {} in task: {} got current dir: {}, checkpoint dirs: {}, checkpointed changelog offset: {}", new Object[]{str, taskName, of, of2, str2});
                of.ifPresent(file4 -> {
                    LOG.info("Marking current directory: {} for store: {} in task: {} for deletion.", new Object[]{file4, str, taskName});
                    create.put(str, file4);
                });
                if (str2 == null && oldestOffset != null) {
                    of2.ifPresent(list -> {
                        list.forEach(file5 -> {
                            LOG.info("Marking checkpoint directory: {} for store: {} in task: {} for deletion since checkpointed offset is null and oldest offset: {} is not.", new Object[]{file5, str, taskName, oldestOffset});
                            create.put(str, file5);
                        });
                    });
                    if (new TaskConfig(config).getTransactionalStateRetainExistingState()) {
                        LOG.warn("Checkpointed offset for store: {} in task: {} is null. Since retain existing state is true, local state will be fully restored from current changelog contents. There is no transactional local state guarantee.", str, taskName);
                        hashMap2.put(str, new RestoreOffsets(oldestOffset, newestOffset));
                        return;
                    } else {
                        LOG.warn("Checkpointed offset for store: {} in task: {} is null. Since retain existing state is false, any local state and changelog topic contents will be deleted.", str, taskName);
                        hashMap2.put(str, new RestoreOffsets(oldestOffset, null));
                        return;
                    }
                }
                if (systemAdmin.offsetComparator(oldestOffset, str2).intValue() > 0 || systemAdmin.offsetComparator(str2, newestOffset).intValue() > 0) {
                    LOG.warn("Checkpointed offset: {} for store: {} in task: {} is out of range of oldest: {} or newest: {} offset.Deleting existing store and fully restoring from changelog topic from oldest to newest offset. If the topic has time-based retention, there is no transactional local state guarantees. If the topic was changed,local state will be cleaned up and fully restored to match the new topic contents.", new Object[]{str2, str, taskName, oldestOffset, newestOffset});
                    of2.ifPresent(list2 -> {
                        list2.forEach(file5 -> {
                            create.put(str, file5);
                        });
                    });
                    hashMap2.put(str, new RestoreOffsets(oldestOffset, newestOffset));
                    return;
                }
                if (!of2.isPresent()) {
                    LOG.info("Did not find any checkpoint directories for logged (maybe non-persistent) store: {}. Local state will be fully restored from current changelog contents.", str);
                    hashMap2.put(str, new RestoreOffsets(oldestOffset, str2));
                    return;
                }
                long changelogMinCompactionLagMs = new StorageConfig(config).getChangelogMinCompactionLagMs(str);
                if (currentTimeMillis > 0.9d * changelogMinCompactionLagMs) {
                    LOG.warn("Checkpointed offset for store: {} in task: {} is: {}. It is in range of oldest: {} and newest: {} changelog offset. However, time since last checkpoint is: {}, which is greater than 0.9 * min.compaction.lag.ms: {} for the changelog topic. Since there is a chance thatthe changelog topic has been compacted, restoring store to the end of the current changelog contents.There is no transactional local state guarantee.", new Object[]{str, taskName, str2, oldestOffset, newestOffset, Long.valueOf(currentTimeMillis), Long.valueOf(changelogMinCompactionLagMs)});
                    str = newestOffset;
                } else {
                    str = str2;
                }
                boolean z = false;
                for (File file5 : (List) of2.get()) {
                    if (storageManagerUtil.isLoggedStoreValid(str, file5, config, map2, taskModel, clock, map)) {
                        String str3 = storageManagerUtil.readOffsetFile(file5, Collections.singleton(systemStreamPartition), false).get(systemStreamPartition);
                        LOG.info("Read local offset: {} for store: {} checkpoint dir: {} in task: {}", new Object[]{str3, str, file5, taskName});
                        if (systemAdmin.offsetComparator(str3, oldestOffset).intValue() >= 0 && systemAdmin.offsetComparator(str3, str).intValue() <= 0 && (hashMap2.get(str) == null || systemAdmin.offsetComparator(str3, ((RestoreOffsets) hashMap2.get(str)).startingOffset).intValue() > 0)) {
                            z = true;
                            LOG.info("Temporarily marking checkpoint dir: {} for store: {} in task: {} for retention. May be overridden later.", new Object[]{file5, str, taskName});
                            hashMap.put(str, file5);
                            LOG.info("Temporarily marking store: {} in task: {} for restore from beginning offset: {} to ending offset: {}. May be overridden later", new Object[]{str, taskName, str3, str});
                            hashMap2.put(str, new RestoreOffsets(str3, str));
                        }
                    }
                }
                for (File file6 : (List) of2.get()) {
                    if (hashMap.get(str) == null || !((File) hashMap.get(str)).equals(file6)) {
                        LOG.info("Marking checkpoint directory: {} for store: {} in task: {} for deletion since it is not marked for retention.", new Object[]{file6, str, taskName});
                        create.put(str, file6);
                    }
                }
                if (z) {
                    return;
                }
                hashMap2.put(str, new RestoreOffsets(oldestOffset, str));
            }
        });
        LOG.info("Store directories to be retained in Task: {} are: {}", taskName, hashMap);
        LOG.info("Store directories to be deleted in Task: {} are: {}", taskName, create);
        LOG.info("Stores to be restored in Task: {} are: {}", taskName, hashMap2);
        return new StoreActions(hashMap, create, hashMap2);
    }

    @VisibleForTesting
    static void setupStoreDirs(TaskModel taskModel, Map<String, StorageEngine> map, StoreActions storeActions, StorageManagerUtil storageManagerUtil, FileUtil fileUtil, File file, File file2) {
        TaskName taskName = taskModel.getTaskName();
        TaskMode taskMode = taskModel.getTaskMode();
        ListMultimap<String, File> listMultimap = storeActions.storeDirsToDelete;
        Map<String, File> map2 = storeActions.storeDirsToRetain;
        listMultimap.entries().forEach(entry -> {
            String str = (String) entry.getKey();
            File file3 = (File) entry.getValue();
            LOG.info("Deleting persistent store directory: {} for store: {} in task: {}", new Object[]{file3, str, taskName});
            fileUtil.rm(file3);
        });
        map2.forEach((str, file3) -> {
            File taskStoreDir = storageManagerUtil.getTaskStoreDir(file, str, taskName, taskMode);
            LOG.info("Moving logged store checkpoint directory: {} for store: {} in task: {} to current directory: {}", new Object[]{map2.toString(), str, taskName, taskStoreDir});
            storageManagerUtil.restoreCheckpointFiles(file3, taskStoreDir);
        });
        map.forEach((str2, storageEngine) -> {
            if (storageEngine.getStoreProperties().isPersistedToDisk()) {
                File taskStoreDir = storageEngine.getStoreProperties().isLoggedStore() ? storageManagerUtil.getTaskStoreDir(file, str2, taskName, taskMode) : storageManagerUtil.getTaskStoreDir(file2, str2, taskName, taskMode);
                try {
                    if (!fileUtil.exists(taskStoreDir.toPath())) {
                        LOG.info("Creating missing persistent store current directory: {} for store: {} in task: {}", new Object[]{taskStoreDir, str2, taskName});
                        fileUtil.createDirectories(taskStoreDir.toPath());
                    }
                } catch (Exception e) {
                    throw new SamzaException(String.format("Error setting up current directory for store: %s", str2), e);
                }
            }
        });
    }

    @VisibleForTesting
    static void registerStartingOffsets(TaskModel taskModel, StoreActions storeActions, Map<String, SystemStream> map, SystemAdmins systemAdmins, Map<String, SystemConsumer> map2, Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> map3) {
        Map<String, RestoreOffsets> map4 = storeActions.storesToRestore;
        map.forEach((str, systemStream) -> {
            SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemStream, taskModel.getChangelogPartition());
            SystemConsumer systemConsumer = (SystemConsumer) map2.get(str);
            String upcomingOffset = ((SystemStreamMetadata.SystemStreamPartitionMetadata) map3.get(systemStreamPartition)).getUpcomingOffset();
            LOG.info("Temporarily registering upcoming offset: {} as the starting offest for changelog ssp: {}. This might be overridden later for stores that need restoring.", upcomingOffset, systemStreamPartition);
            systemConsumer.register(systemStreamPartition, upcomingOffset);
        });
        map4.forEach((str2, restoreOffsets) -> {
            SystemStream systemStream2 = (SystemStream) map.get(str2);
            SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemStream2, taskModel.getChangelogPartition());
            SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(systemStream2.getSystem());
            validateRestoreOffsets(restoreOffsets, systemAdmin);
            SystemConsumer systemConsumer = (SystemConsumer) map2.get(str2);
            String oldestOffset = ((SystemStreamMetadata.SystemStreamPartitionMetadata) map3.get(systemStreamPartition)).getOldestOffset();
            String str2 = systemAdmin.offsetComparator(restoreOffsets.startingOffset, oldestOffset).intValue() == 0 ? oldestOffset : (String) systemAdmin.getOffsetsAfter(ImmutableMap.of(systemStreamPartition, restoreOffsets.startingOffset)).get(systemStreamPartition);
            LOG.info("Registering starting offset: {} for changelog ssp: {}", str2, systemStreamPartition);
            systemConsumer.register(systemStreamPartition, str2);
        });
    }

    private static void validateRestoreOffsets(RestoreOffsets restoreOffsets, SystemAdmin systemAdmin) {
        String str = restoreOffsets.startingOffset;
        String str2 = restoreOffsets.endingOffset;
        if (str2 != null) {
            Preconditions.checkState(systemAdmin.offsetComparator(str2, str).intValue() >= 0, String.format("Ending offset: %s must be equal to or greater than starting offset: %s", str2, str));
        }
    }

    private Map<String, KafkaStateCheckpointMarker> getCheckpointedChangelogOffsets(Checkpoint checkpoint) {
        HashMap hashMap = new HashMap();
        if (checkpoint == null) {
            return hashMap;
        }
        if (checkpoint instanceof CheckpointV2) {
            Map stateCheckpointMarkers = ((CheckpointV2) checkpoint).getStateCheckpointMarkers();
            if (stateCheckpointMarkers.containsKey(KafkaStateCheckpointMarker.KAFKA_STATE_BACKEND_FACTORY_NAME)) {
                ((Map) stateCheckpointMarkers.get(KafkaStateCheckpointMarker.KAFKA_STATE_BACKEND_FACTORY_NAME)).forEach((str, str2) -> {
                    hashMap.put(str, KafkaStateCheckpointMarker.deserialize(str2));
                });
            }
        } else {
            if (!(checkpoint instanceof CheckpointV1)) {
                throw new SamzaException("Unsupported checkpoint version: " + ((int) checkpoint.getVersion()));
            }
            Map offsets = checkpoint.getOffsets();
            this.storeChangelogs.forEach((str3, systemStream) -> {
                SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemStream, this.taskModel.getChangelogPartition());
                String str3 = (String) offsets.get(systemStreamPartition);
                if (StringUtils.isNotBlank(str3)) {
                    hashMap.put(str3, new KafkaStateCheckpointMarker(systemStreamPartition, KafkaChangelogSSPOffset.fromString(str3).getChangelogOffset()));
                }
            });
        }
        return hashMap;
    }

    private CheckpointId getCheckpointId(Checkpoint checkpoint) {
        if (checkpoint == null) {
            return null;
        }
        if (!(checkpoint instanceof CheckpointV1)) {
            if (checkpoint instanceof CheckpointV2) {
                return ((CheckpointV2) checkpoint).getCheckpointId();
            }
            throw new SamzaException("Unsupported checkpoint version: " + ((int) checkpoint.getVersion()));
        }
        Iterator<Map.Entry<String, SystemStream>> it = this.storeChangelogs.entrySet().iterator();
        while (it.hasNext()) {
            String str = (String) checkpoint.getOffsets().get(new SystemStreamPartition(it.next().getValue(), this.taskModel.getChangelogPartition()));
            if (StringUtils.isNotBlank(str)) {
                return KafkaChangelogSSPOffset.fromString(str).getCheckpointId();
            }
        }
        return null;
    }
}
