package org.apache.samza.container.grouper.stream;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.container.grouper.task.GrouperMetadata;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.ReflectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/container/grouper/stream/SSPGrouperProxy.class */
public class SSPGrouperProxy {
    private static final Logger LOGGER = LoggerFactory.getLogger(SSPGrouperProxy.class);
    private final Config config;
    private final SystemStreamPartitionGrouper grouper;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/samza/container/grouper/stream/SSPGrouperProxy$PartitionGroup.class */
    public static class PartitionGroup {
        private TaskName taskName;
        private Set<SystemStreamPartition> systemStreamPartitions;

        PartitionGroup(TaskName taskName, Collection<SystemStreamPartition> collection) {
            Preconditions.checkNotNull(taskName);
            Preconditions.checkNotNull(collection);
            this.taskName = taskName;
            this.systemStreamPartitions = new HashSet(collection);
        }

        void removeSSP(SystemStreamPartition systemStreamPartition) {
            this.systemStreamPartitions.remove(systemStreamPartition);
        }

        void addSSP(SystemStreamPartition systemStreamPartition) {
            this.systemStreamPartitions.add(systemStreamPartition);
        }
    }

    public SSPGrouperProxy(Config config, SystemStreamPartitionGrouper systemStreamPartitionGrouper) {
        Preconditions.checkNotNull(config);
        Preconditions.checkNotNull(systemStreamPartitionGrouper);
        this.config = config;
        this.grouper = systemStreamPartitionGrouper;
    }

    public Map<TaskName, Set<SystemStreamPartition>> group(Set<SystemStreamPartition> set, GrouperMetadata grouperMetadata) {
        Map<TaskName, Set<SystemStreamPartition>> group = this.grouper.group(set);
        int numPersistentStores = new StorageConfig(this.config).getNumPersistentStores();
        if (numPersistentStores <= 0) {
            LOGGER.info("Application is stateless with no stores. Using the result from the group method directly from {}.", this.grouper.getClass().getName());
            return group;
        }
        LOGGER.info("Application is stateful with {} stores. Proceeding with the SSP Grouper Proxy.", Integer.valueOf(numPersistentStores));
        if (grouperMetadata.getPreviousTaskToSSPAssignment().isEmpty()) {
            LOGGER.info("Previous task to partition assignment does not exist. Using the result from the group method directly from {}.", this.grouper.getClass().getName());
            return group;
        }
        Map<TaskName, List<SystemStreamPartition>> hashMap = new HashMap<>();
        for (Map.Entry<TaskName, Set<SystemStreamPartition>> entry : group.entrySet()) {
            hashMap.put(entry.getKey(), new ArrayList<>(entry.getValue()));
        }
        Set<SystemStreamPartition> broadcastSystemStreamPartitions = new TaskConfig(this.config).getBroadcastSystemStreamPartitions();
        Map<SystemStreamPartition, TaskName> previousSSPToTaskMapping = getPreviousSSPToTaskMapping(grouperMetadata, broadcastSystemStreamPartitions);
        HashMap hashMap2 = new HashMap();
        hashMap.forEach((taskName, list) -> {
        });
        Map<SystemStream, Integer> systemStreamToPartitionCount = getSystemStreamToPartitionCount(grouperMetadata.getPreviousTaskToSSPAssignment());
        Map<SystemStream, Integer> systemStreamToPartitionCount2 = getSystemStreamToPartitionCount(hashMap);
        SystemStreamPartitionMapper systemStreamPartitionMapper = getSystemStreamPartitionMapper(this.config);
        try {
            for (Map.Entry<TaskName, List<SystemStreamPartition>> entry2 : hashMap.entrySet()) {
                TaskName key = entry2.getKey();
                for (SystemStreamPartition systemStreamPartition : entry2.getValue()) {
                    if (broadcastSystemStreamPartitions.contains(systemStreamPartition)) {
                        LOGGER.info("SystemStreamPartition: {} is part of broadcast stream. Skipping reassignment.", systemStreamPartition);
                    } else {
                        SystemStream systemStream = systemStreamPartition.getSystemStream();
                        Integer orDefault = systemStreamToPartitionCount.getOrDefault(systemStream, 0);
                        Integer orDefault2 = systemStreamToPartitionCount2.getOrDefault(systemStream, 0);
                        TaskName taskName2 = null;
                        if (orDefault.intValue() > 0 && !orDefault2.equals(orDefault)) {
                            LOGGER.info("Partition count of system stream: {} had changed from: {} to: {} partitions.", new Object[]{systemStream, orDefault, orDefault2});
                            taskName2 = previousSSPToTaskMapping.get(systemStreamPartitionMapper.getPreviousSSP(systemStreamPartition, orDefault.intValue(), orDefault2.intValue()));
                        } else if (previousSSPToTaskMapping.containsKey(systemStreamPartition)) {
                            taskName2 = previousSSPToTaskMapping.get(systemStreamPartition);
                        }
                        if (taskName2 != null && !Objects.equals(taskName2, key)) {
                            LOGGER.info("Moving systemStreamPartition: {} from task: {} to task: {}.", new Object[]{systemStreamPartition, key, taskName2});
                            ((PartitionGroup) hashMap2.get(key)).removeSSP(systemStreamPartition);
                            ((PartitionGroup) hashMap2.get(taskName2)).addSSP(systemStreamPartition);
                        }
                    }
                }
            }
            HashMap hashMap3 = new HashMap();
            for (PartitionGroup partitionGroup : hashMap2.values()) {
                if (!partitionGroup.systemStreamPartitions.isEmpty()) {
                    hashMap3.put(partitionGroup.taskName, partitionGroup.systemStreamPartitions);
                }
            }
            return hashMap3;
        } catch (Exception e) {
            throw new SamzaException(String.format("Error in partition-to-task assignment via the SSPGroupProxy. To disable the SSPGroupProxy, set config %s = false, only if you cannot address the root cause as shown in the underlying exception.", JobConfig.SSP_INPUT_EXPANSION_ENABLED), e);
        }
    }

    private Map<SystemStream, Integer> getSystemStreamToPartitionCount(Map<TaskName, List<SystemStreamPartition>> map) {
        HashMap hashMap = new HashMap();
        map.forEach((taskName, list) -> {
            list.forEach(systemStreamPartition -> {
                SystemStream systemStream = systemStreamPartition.getSystemStream();
                hashMap.put(systemStream, Integer.valueOf(((Integer) hashMap.getOrDefault(systemStream, 0)).intValue() + 1));
            });
        });
        return hashMap;
    }

    private static Map<SystemStreamPartition, TaskName> getPreviousSSPToTaskMapping(GrouperMetadata grouperMetadata, Set<SystemStreamPartition> set) {
        HashMap hashMap = new HashMap();
        grouperMetadata.getPreviousTaskToSSPAssignment().forEach((taskName, list) -> {
            list.forEach(systemStreamPartition -> {
                if (set.contains(systemStreamPartition)) {
                    return;
                }
                hashMap.put(systemStreamPartition, taskName);
            });
        });
        return hashMap;
    }

    private SystemStreamPartitionMapper getSystemStreamPartitionMapper(Config config) {
        return ((SystemStreamPartitionMapperFactory) ReflectionUtil.getObj(new JobConfig(config).getSystemStreamPartitionMapperFactoryName(), SystemStreamPartitionMapperFactory.class)).getStreamPartitionMapper(config, new MetricsRegistryMap());
    }
}
