package org.apache.samza.storage;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.SystemConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.util.StreamUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/storage/ChangelogStreamManager.class */
public class ChangelogStreamManager {
    private static final Logger LOG = LoggerFactory.getLogger(ChangelogStreamManager.class);
    private final MetadataStore metadataStore;
    private final CoordinatorStreamValueSerde valueSerde = new CoordinatorStreamValueSerde(SetChangelogMapping.TYPE);

    public ChangelogStreamManager(MetadataStore metadataStore) {
        this.metadataStore = metadataStore;
    }

    public Map<TaskName, Integer> readPartitionMapping() {
        LOG.debug("Reading changelog partition information");
        HashMap hashMap = new HashMap();
        this.metadataStore.all().forEach((str, bArr) -> {
            String m96fromBytes = this.valueSerde.m96fromBytes(bArr);
            LOG.debug("TaskName: {} is mapped to {}", str, m96fromBytes);
            if (StringUtils.isNotBlank(m96fromBytes)) {
                hashMap.put(new TaskName(str), Integer.valueOf(m96fromBytes));
            }
        });
        return hashMap;
    }

    public void writePartitionMapping(Map<TaskName, Integer> map) {
        LOG.debug("Updating changelog information with: ");
        for (Map.Entry<TaskName, Integer> entry : map.entrySet()) {
            Preconditions.checkNotNull(entry.getKey());
            String taskName = entry.getKey().getTaskName();
            if (entry.getValue() != null) {
                String valueOf = String.valueOf(entry.getValue());
                LOG.debug("TaskName: {} to Partition: {}", taskName, entry.getValue());
                this.metadataStore.put(taskName, this.valueSerde.toBytes(valueOf));
            } else {
                LOG.debug("Deleting the TaskName: {}", taskName);
                this.metadataStore.delete(taskName);
            }
        }
        this.metadataStore.flush();
    }

    public void updatePartitionMapping(Map<TaskName, Integer> map, Map<TaskName, Integer> map2) {
        HashMap hashMap = new HashMap(map2);
        hashMap.putAll(map);
        writePartitionMapping(hashMap);
    }

    public static void createChangelogStreams(Config config, int i) {
        StorageConfig storageConfig = new StorageConfig(config);
        ImmutableMap.Builder builder = new ImmutableMap.Builder();
        storageConfig.getStoreNames().forEach(str -> {
            Optional<String> changelogStream = storageConfig.getChangelogStream(str);
            if (changelogStream.isPresent() && StringUtils.isNotBlank(changelogStream.get())) {
                builder.put(str, StreamUtil.getSystemStreamFromNames(changelogStream.get()));
            }
        });
        ImmutableMap build = builder.build();
        SystemConfig systemConfig = new SystemConfig(config);
        build.forEach((str2, systemStream) -> {
            SystemAdmin admin = systemConfig.getSystemFactories().get(systemStream.getSystem()).getAdmin(systemStream.getSystem(), config, ChangelogStreamManager.class.getSimpleName());
            if (admin == null) {
                throw new SamzaException(String.format("Error creating changelog. Changelog on store %s uses system %s, which is missing from the configuration.", str2, systemStream.getSystem()));
            }
            StreamSpec createChangeLogStreamSpec = StreamSpec.createChangeLogStreamSpec(systemStream.getStream(), systemStream.getSystem(), i);
            admin.start();
            if (admin.createStream(createChangeLogStreamSpec)) {
                LOG.info(String.format("created changelog stream %s.", systemStream.getStream()));
            } else {
                LOG.info(String.format("changelog stream %s already exists.", systemStream.getStream()));
            }
            admin.validateStream(createChangeLogStreamSpec);
            if (storageConfig.getAccessLogEnabled(str2)) {
                String accessLogStream = storageConfig.getAccessLogStream(systemStream.getStream());
                StreamSpec streamSpec = new StreamSpec(accessLogStream, accessLogStream, systemStream.getSystem(), i);
                admin.createStream(streamSpec);
                admin.validateStream(streamSpec);
            }
            admin.stop();
        });
    }
}
