package org.apache.samza.storage;

import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.commons.collections4.MapUtils;
import org.apache.samza.config.Config;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.JobContext;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.SSPMetadataCache;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.Clock;

/* loaded from: input_file:org/apache/samza/storage/KafkaChangelogStateBackendFactory.class */
public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
    private StreamMetadataCache streamCache;
    private SSPMetadataCache sspCache;

    /* loaded from: input_file:org/apache/samza/storage/KafkaChangelogStateBackendFactory$NoOpKafkaChangelogStateBackendAdmin.class */
    public class NoOpKafkaChangelogStateBackendAdmin implements StateBackendAdmin {
        public NoOpKafkaChangelogStateBackendAdmin() {
        }

        public void createResources() {
        }

        public void validateResources() {
        }
    }

    public TaskBackupManager getBackupManager(JobContext jobContext, ContainerModel containerModel, TaskModel taskModel, Map<String, SystemAdmin> map, ExecutorService executorService, MetricsRegistry metricsRegistry, Config config, Clock clock, File file, File file2) {
        SystemAdmins systemAdmins = new SystemAdmins(map);
        Map<String, SystemStream> storeChangelogs = new StorageConfig(config).getStoreChangelogs();
        return new TaskConfig(config).getTransactionalStateCheckpointEnabled() ? new KafkaTransactionalStateTaskBackupManager(taskModel.getTaskName(), storeChangelogs, systemAdmins, taskModel.getChangelogPartition()) : new KafkaNonTransactionalStateTaskBackupManager(taskModel.getTaskName(), storeChangelogs, systemAdmins, taskModel.getChangelogPartition());
    }

    public TaskRestoreManager getRestoreManager(JobContext jobContext, ContainerContext containerContext, TaskModel taskModel, ExecutorService executorService, MetricsRegistry metricsRegistry, Set<String> set, Config config, Clock clock, File file, File file2, KafkaChangelogRestoreParams kafkaChangelogRestoreParams) {
        Map<String, SystemStream> storeChangelogs = new StorageConfig(config).getStoreChangelogs();
        Set<SystemStreamPartition> set2 = (Set) storeChangelogs.values().stream().flatMap(systemStream -> {
            return containerContext.getContainerModel().getTasks().values().stream().map(taskModel2 -> {
                return new SystemStreamPartition(systemStream, taskModel2.getChangelogPartition());
            });
        }).collect(Collectors.toSet());
        Map<String, SystemStream> filterStandbySystemStreams = filterStandbySystemStreams(storeChangelogs, containerContext.getContainerModel());
        SystemAdmins systemAdmins = new SystemAdmins((Map<String, SystemAdmin>) kafkaChangelogRestoreParams.getSystemAdmins());
        return new TaskConfig(config).getTransactionalStateRestoreEnabled() ? new TransactionalStateTaskRestoreManager(set, jobContext, containerContext, taskModel, executorService, filterStandbySystemStreams, kafkaChangelogRestoreParams.getInMemoryStores(), kafkaChangelogRestoreParams.getStorageEngineFactories(), kafkaChangelogRestoreParams.getSerdes(), systemAdmins, kafkaChangelogRestoreParams.getStoreConsumers(), metricsRegistry, kafkaChangelogRestoreParams.getCollector(), getSspCache(systemAdmins, clock, set2), file, file2, config, clock) : new NonTransactionalStateTaskRestoreManager(set, jobContext, containerContext, taskModel, executorService, filterStandbySystemStreams, kafkaChangelogRestoreParams.getInMemoryStores(), kafkaChangelogRestoreParams.getStorageEngineFactories(), kafkaChangelogRestoreParams.getSerdes(), systemAdmins, getStreamCache(systemAdmins, clock), kafkaChangelogRestoreParams.getStoreConsumers(), metricsRegistry, kafkaChangelogRestoreParams.getCollector(), jobContext.getJobModel().getMaxChangeLogStreamPartitions(), file, file2, config, clock);
    }

    public StateBackendAdmin getAdmin(JobModel jobModel, Config config) {
        return new NoOpKafkaChangelogStateBackendAdmin();
    }

    public Set<SystemStreamPartition> getChangelogSSPForContainer(Map<String, SystemStream> map, ContainerContext containerContext) {
        return (Set) map.values().stream().flatMap(systemStream -> {
            return containerContext.getContainerModel().getTasks().values().stream().map(taskModel -> {
                return new SystemStreamPartition(systemStream, taskModel.getChangelogPartition());
            });
        }).collect(Collectors.toSet());
    }

    @VisibleForTesting
    StreamMetadataCache getStreamCache(SystemAdmins systemAdmins, Clock clock) {
        if (this.streamCache == null) {
            this.streamCache = new StreamMetadataCache(systemAdmins, 5000, clock);
        }
        return this.streamCache;
    }

    private SSPMetadataCache getSspCache(SystemAdmins systemAdmins, Clock clock, Set<SystemStreamPartition> set) {
        if (this.sspCache == null) {
            this.sspCache = new SSPMetadataCache(systemAdmins, Duration.ofSeconds(5L), clock, set);
        }
        return this.sspCache;
    }

    @VisibleForTesting
    Map<String, SystemStream> filterStandbySystemStreams(Map<String, SystemStream> map, ContainerModel containerModel) {
        HashMap hashMap = new HashMap();
        map.forEach((str, systemStream) -> {
            containerModel.getTasks().forEach((taskName, taskModel) -> {
            });
        });
        ((Set) containerModel.getTasks().values().stream().filter(taskModel -> {
            return taskModel.getTaskMode().equals(TaskMode.Standby);
        }).collect(Collectors.toSet())).forEach(taskModel2 -> {
            map.forEach((str2, systemStream2) -> {
                hashMap.remove(new SystemStreamPartition(systemStream2, taskModel2.getChangelogPartition()));
            });
        });
        return (Map) MapUtils.invertMap(hashMap).entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return ((SystemStreamPartition) entry.getValue()).getSystemStream();
        }));
    }
}
