package org.apache.samza.execution;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.samza.SamzaException;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.StreamConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemStream;
import org.apache.samza.util.StreamUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/execution/StreamManager.class */
public class StreamManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamManager.class);
    private final SystemAdmins systemAdmins;

    public StreamManager(Config config) {
        this(new SystemAdmins(config, StreamManager.class.getSimpleName()));
    }

    @VisibleForTesting
    StreamManager(SystemAdmins systemAdmins) {
        this.systemAdmins = systemAdmins;
    }

    public void createStreams(List<StreamSpec> list) {
        HashMultimap create = HashMultimap.create();
        list.forEach(streamSpec -> {
            create.put(streamSpec.getSystemName(), streamSpec);
        });
        for (Map.Entry entry : create.asMap().entrySet()) {
            String str = (String) entry.getKey();
            SystemAdmin systemAdmin = this.systemAdmins.getSystemAdmin(str);
            for (StreamSpec streamSpec2 : (Collection) entry.getValue()) {
                LOGGER.info("Creating stream {} with partitions {} on system {}", new Object[]{streamSpec2.getPhysicalName(), Integer.valueOf(streamSpec2.getPartitionCount()), str});
                systemAdmin.createStream(streamSpec2);
            }
        }
    }

    public void start() {
        this.systemAdmins.start();
    }

    public void stop() {
        this.systemAdmins.stop();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<String, Integer> getStreamPartitionCounts(String str, Set<String> set) {
        HashMap hashMap = new HashMap();
        SystemAdmin systemAdmin = this.systemAdmins.getSystemAdmin(str);
        if (systemAdmin == null) {
            throw new SamzaException(String.format("System %s does not exist.", str));
        }
        systemAdmin.getSystemStreamMetadata(set).forEach((str2, systemStreamMetadata) -> {
        });
        return hashMap;
    }

    public void clearStreamsFromPreviousRun(Config config) {
        try {
            LOGGER.info("run.id from previous run is {}", new ApplicationConfig(config).getRunId());
            StreamConfig streamConfig = new StreamConfig(config);
            Stream<String> stream = streamConfig.getStreamIds().stream();
            streamConfig.getClass();
            ((Set) stream.filter(streamConfig::getIsIntermediateStream).map(str -> {
                return new StreamSpec(str, streamConfig.getPhysicalName(str), streamConfig.getSystem(str));
            }).collect(Collectors.toSet())).forEach(streamSpec -> {
                LOGGER.info("Clear intermediate stream {} in system {}", streamSpec.getPhysicalName(), streamSpec.getSystemName());
                this.systemAdmins.getSystemAdmin(streamSpec.getSystemName()).clearStream(streamSpec);
            });
            new TaskConfig(config).getCheckpointManager(new MetricsRegistryMap()).ifPresent((v0) -> {
                v0.clearCheckpoints();
            });
            StorageConfig storageConfig = new StorageConfig(config);
            for (String str2 : storageConfig.getStoreNames()) {
                String orElse = storageConfig.getChangelogStream(str2).orElse(null);
                if (orElse != null) {
                    LOGGER.info("Clear store {} changelog {}", str2, orElse);
                    SystemStream systemStreamFromNames = StreamUtil.getSystemStreamFromNames(orElse);
                    StreamSpec createChangeLogStreamSpec = StreamSpec.createChangeLogStreamSpec(systemStreamFromNames.getStream(), systemStreamFromNames.getSystem(), 1);
                    this.systemAdmins.getSystemAdmin(createChangeLogStreamSpec.getSystemName()).clearStream(createChangeLogStreamSpec);
                }
            }
        } catch (Exception e) {
            LOGGER.warn("Fail to clear internal streams from previous run. Please clean up manually.", e);
        }
    }

    public static String createUniqueNameForBatch(String str, Config config) {
        ApplicationConfig applicationConfig = new ApplicationConfig(config);
        return (applicationConfig.getAppMode() != ApplicationConfig.ApplicationMode.BATCH || applicationConfig.getRunId() == null) ? str : str + "-" + applicationConfig.getRunId();
    }
}
