package org.apache.samza.execution;

import com.google.common.base.Joiner;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.SerializerConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.StreamConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.StatefulOperatorSpec;
import org.apache.samza.operators.spec.StoreDescriptor;
import org.apache.samza.operators.spec.WindowOperatorSpec;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerializableSerde;
import org.apache.samza.table.TableConfigGenerator;
import org.apache.samza.table.descriptors.LocalTableDescriptor;
import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.util.ConfigUtil;
import org.apache.samza.util.MathUtil;
import org.apache.samza.util.StreamUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/execution/JobNodeConfigurationGenerator.class */
class JobNodeConfigurationGenerator {
    private static final Logger LOG = LoggerFactory.getLogger(JobNodeConfigurationGenerator.class);
    static final String CONFIG_INTERNAL_EXECUTION_PLAN = "samza.internal.execution.plan";

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Config mergeConfig(Map<String, String> map, Map<String, String> map2) {
        validateJobConfigs(map, map2);
        HashMap hashMap = new HashMap(map2);
        map.forEach((str, str2) -> {
            if (map2.containsKey(str) && !Objects.equals(map2.get(str), str2)) {
                LOG.info("Replacing generated config for key: {} value: {} with original config value: {}", new Object[]{str, map2.get(str), str2});
            }
            hashMap.put(str, str2);
        });
        return ConfigUtil.rewriteConfig(new MapConfig(hashMap));
    }

    static void validateJobConfigs(Map<String, String> map, Map<String, String> map2) {
        String str = map.get(JobConfig.JOB_ID);
        String str2 = map.get(JobConfig.JOB_NAME);
        String str3 = map2.get(JobConfig.JOB_ID);
        String str4 = map2.get(JobConfig.JOB_NAME);
        if (str4 != null && str2 != null && !StringUtils.equals(str4, str2)) {
            throw new SamzaException(String.format("Generated job.name = %s from app.name = %s does not match user configured job.name = %s, please configure job.name same as app.name", str4, map.get(ApplicationConfig.APP_NAME), str2));
        }
        if (str3 != null && str != null && !StringUtils.equals(str3, str)) {
            throw new SamzaException(String.format("Generated job.id = %s from app.id = %s does not match user configured job.id = %s, please configure job.id same as app.id", str3, map.get(ApplicationConfig.APP_ID), str));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public JobConfig generateJobConfig(JobNode jobNode, String str) {
        if (jobNode.isLegacyTaskApplication()) {
            return new JobConfig(jobNode.getConfig());
        }
        HashMap hashMap = new HashMap();
        hashMap.put(JobConfig.JOB_NAME, jobNode.getJobName());
        hashMap.put(JobConfig.JOB_ID, jobNode.getJobId());
        Map<String, StreamEdge> inEdges = jobNode.getInEdges();
        Map<String, StreamEdge> outEdges = jobNode.getOutEdges();
        Collection<OperatorSpec> reachableOperators = jobNode.getReachableOperators();
        List<StoreDescriptor> storeDescriptors = getStoreDescriptors(reachableOperators);
        Map<String, TableDescriptor> reachableTables = getReachableTables(reachableOperators, jobNode);
        Config config = jobNode.getConfig();
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        for (StreamEdge streamEdge : inEdges.values()) {
            String name = streamEdge.getName();
            if (!streamEdge.isBroadcast()) {
                hashSet.add(name);
            } else if (streamEdge.getPartitionCount() > 1) {
                hashSet2.add(name + "#[0-" + (streamEdge.getPartitionCount() - 1) + "]");
            } else {
                hashSet2.add(name + "#0");
            }
        }
        configureBroadcastInputs(hashMap, config, hashSet2);
        configureWindowInterval(hashMap, config, reachableOperators);
        storeDescriptors.forEach(storeDescriptor -> {
            hashMap.putAll(storeDescriptor.getStorageConfigs());
        });
        hashMap.put(CONFIG_INTERNAL_EXECUTION_PLAN, str);
        inEdges.values().stream().filter((v0) -> {
            return v0.isIntermediate();
        }).forEach(streamEdge2 -> {
            hashMap.putAll(streamEdge2.generateConfig());
        });
        configureSerdes(hashMap, inEdges, outEdges, storeDescriptors, reachableTables.keySet(), jobNode);
        configureTables(hashMap, config, reachableTables, hashSet);
        hashMap.put(TaskConfig.INPUT_STREAMS, Joiner.on(',').join(hashSet));
        LOG.info("Job {} has generated configs {}", jobNode.getJobNameAndId(), hashMap);
        return new JobConfig(mergeConfig(config, hashMap));
    }

    private Map<String, TableDescriptor> getReachableTables(Collection<OperatorSpec> collection, JobNode jobNode) {
        return jobNode.getTables();
    }

    private void configureBroadcastInputs(Map<String, String> map, Config config, Set<String> set) {
        if (set.isEmpty()) {
            return;
        }
        String str = (String) config.get(TaskConfig.BROADCAST_INPUT_STREAMS);
        if (StringUtils.isNotBlank(str)) {
            set.add(str);
        }
        map.put(TaskConfig.BROADCAST_INPUT_STREAMS, Joiner.on(',').join(set));
    }

    private void configureWindowInterval(Map<String, String> map, Config config, Collection<OperatorSpec> collection) {
        if (collection.stream().anyMatch(operatorSpec -> {
            return operatorSpec.getOpCode() == OperatorSpec.OpCode.WINDOW || operatorSpec.getOpCode() == OperatorSpec.OpCode.JOIN;
        })) {
            long computeTriggerInterval = computeTriggerInterval(collection);
            LOG.info("Using triggering interval: {}", Long.valueOf(computeTriggerInterval));
            map.put(TaskConfig.WINDOW_MS, String.valueOf(computeTriggerInterval));
        }
    }

    private long computeTriggerInterval(Collection<OperatorSpec> collection) {
        List list = (List) collection.stream().filter(operatorSpec -> {
            return operatorSpec.getOpCode() == OperatorSpec.OpCode.WINDOW;
        }).map(operatorSpec2 -> {
            return Long.valueOf(((WindowOperatorSpec) operatorSpec2).getDefaultTriggerMs());
        }).collect(Collectors.toList());
        ArrayList arrayList = new ArrayList((List) collection.stream().filter(operatorSpec3 -> {
            return operatorSpec3 instanceof JoinOperatorSpec;
        }).map(operatorSpec4 -> {
            return Long.valueOf(((JoinOperatorSpec) operatorSpec4).getTtlMs());
        }).collect(Collectors.toList()));
        arrayList.addAll(list);
        if (arrayList.isEmpty()) {
            return -1L;
        }
        return MathUtil.gcd(arrayList);
    }

    private List<StoreDescriptor> getStoreDescriptors(Collection<OperatorSpec> collection) {
        return (List) collection.stream().filter(operatorSpec -> {
            return operatorSpec instanceof StatefulOperatorSpec;
        }).map(operatorSpec2 -> {
            return ((StatefulOperatorSpec) operatorSpec2).getStoreDescriptors();
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
    }

    private void configureTables(Map<String, String> map, Config config, Map<String, TableDescriptor> map2, Set<String> set) {
        map.putAll(TableConfigGenerator.generate(new MapConfig(map), new ArrayList(map2.values())));
        map2.values().forEach(tableDescriptor -> {
            List sideInputs;
            if (!(tableDescriptor instanceof LocalTableDescriptor) || (sideInputs = ((LocalTableDescriptor) tableDescriptor).getSideInputs()) == null || sideInputs.isEmpty()) {
                return;
            }
            sideInputs.stream().map(str -> {
                return StreamUtil.getSystemStreamFromNameOrId(config, str);
            }).forEach(systemStream -> {
                set.add(StreamUtil.getNameFromSystemStream(systemStream));
                map.put(String.format("systems.%s.streams.%s.samza.bootstrap", systemStream.getSystem(), systemStream.getStream()), "true");
            });
        });
    }

    private void configureSerdes(Map<String, String> map, Map<String, StreamEdge> map2, Map<String, StreamEdge> map3, List<StoreDescriptor> list, Collection<String> collection, JobNode jobNode) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        map2.keySet().forEach(str -> {
            addSerdes(jobNode.getInputSerdes(str), str, hashMap, hashMap2);
        });
        map3.keySet().forEach(str2 -> {
            addSerdes(jobNode.getOutputSerde(str2), str2, hashMap, hashMap2);
        });
        HashMap hashMap3 = new HashMap();
        HashMap hashMap4 = new HashMap();
        list.forEach(storeDescriptor -> {
            hashMap3.put(storeDescriptor.getStoreName(), storeDescriptor.getKeySerde());
            hashMap4.put(storeDescriptor.getStoreName(), storeDescriptor.getMsgSerde());
        });
        HashMap hashMap5 = new HashMap();
        HashMap hashMap6 = new HashMap();
        collection.forEach(str3 -> {
            addSerdes(jobNode.getTableSerdes(str3), str3, hashMap5, hashMap6);
        });
        HashSet hashSet = new HashSet(hashMap.values());
        hashSet.addAll(hashMap2.values());
        hashSet.addAll(hashMap3.values());
        hashSet.addAll(hashMap4.values());
        hashSet.addAll(hashMap5.values());
        hashSet.addAll(hashMap6.values());
        SerializableSerde serializableSerde = new SerializableSerde();
        Base64.Encoder encoder = Base64.getEncoder();
        HashMap hashMap7 = new HashMap();
        hashSet.forEach(serde -> {
            map.putIfAbsent(String.format(SerializerConfig.SERDE_SERIALIZED_INSTANCE, (String) hashMap7.computeIfAbsent(serde, serde -> {
                return serde.getClass().getSimpleName() + "-" + UUID.randomUUID().toString();
            })), encoder.encodeToString(serializableSerde.toBytes(serde)));
        });
        hashMap.forEach((str4, serde2) -> {
            map.put(String.format(StreamConfig.STREAM_ID_PREFIX, str4) + StreamConfig.KEY_SERDE, hashMap7.get(serde2));
        });
        hashMap2.forEach((str5, serde3) -> {
            map.put(String.format(StreamConfig.STREAM_ID_PREFIX, str5) + StreamConfig.MSG_SERDE, hashMap7.get(serde3));
        });
        hashMap3.forEach((str6, serde4) -> {
            map.put(String.format(StorageConfig.KEY_SERDE, str6), hashMap7.get(serde4));
        });
        hashMap4.forEach((str7, serde5) -> {
            map.put(String.format(StorageConfig.MSG_SERDE, str7), hashMap7.get(serde5));
        });
        hashMap5.forEach((str8, serde6) -> {
            map.put(String.format(StorageConfig.KEY_SERDE, str8), hashMap7.get(serde6));
        });
        hashMap6.forEach((str9, serde7) -> {
            map.put(String.format(StorageConfig.MSG_SERDE, str9), hashMap7.get(serde7));
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void addSerdes(KV<Serde, Serde> kv, String str, Map<String, Serde> map, Map<String, Serde> map2) {
        if (kv != null) {
            if (kv.getKey() != null && !(kv.getKey() instanceof NoOpSerde)) {
                map.put(str, kv.getKey());
            }
            if (kv.getValue() == null || (kv.getValue() instanceof NoOpSerde)) {
                return;
            }
            map2.put(str, kv.getValue());
        }
    }
}
