package org.apache.samza.execution;

import com.google.common.base.Joiner;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.SerializerConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.StreamConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.operators.StreamGraphImpl;
import org.apache.samza.operators.spec.InputOperatorSpec;
import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.OutputStreamImpl;
import org.apache.samza.operators.spec.StatefulOperatorSpec;
import org.apache.samza.operators.spec.WindowOperatorSpec;
import org.apache.samza.operators.util.MathUtils;
import org.apache.samza.serializers.SerializableSerde;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/execution/JobNode.class */
public class JobNode {
    private static final Logger log = LoggerFactory.getLogger(JobNode.class);
    private static final String CONFIG_JOB_PREFIX = "jobs.%s.";
    private static final String CONFIG_INTERNAL_EXECUTION_PLAN = "samza.internal.execution.plan";
    private final String jobName;
    private final String jobId;
    private final String id;
    private final StreamGraphImpl streamGraph;
    private final List<StreamEdge> inEdges = new ArrayList();
    private final List<StreamEdge> outEdges = new ArrayList();
    private final Config config;

    /* JADX INFO: Access modifiers changed from: package-private */
    public JobNode(String str, String str2, StreamGraphImpl streamGraphImpl, Config config) {
        this.jobName = str;
        this.jobId = str2;
        this.id = createId(str, str2);
        this.streamGraph = streamGraphImpl;
        this.config = config;
    }

    public StreamGraphImpl getStreamGraph() {
        return this.streamGraph;
    }

    public String getId() {
        return this.id;
    }

    public String getJobName() {
        return this.jobName;
    }

    public String getJobId() {
        return this.jobId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addInEdge(StreamEdge streamEdge) {
        this.inEdges.add(streamEdge);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addOutEdge(StreamEdge streamEdge) {
        this.outEdges.add(streamEdge);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<StreamEdge> getInEdges() {
        return this.inEdges;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<StreamEdge> getOutEdges() {
        return this.outEdges;
    }

    public JobConfig generateConfig(String str) {
        HashMap hashMap = new HashMap();
        hashMap.put(JobConfig.JOB_NAME(), this.jobName);
        hashMap.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join((List) this.inEdges.stream().map(streamEdge -> {
            return streamEdge.getFormattedSystemStream();
        }).collect(Collectors.toList())));
        if (this.streamGraph.hasWindowOrJoins() && "-1".equals(this.config.get(TaskConfig.WINDOW_MS(), "-1"))) {
            long computeTriggerInterval = computeTriggerInterval();
            log.info("Using triggering interval: {} for jobName: {}", Long.valueOf(computeTriggerInterval), this.jobName);
            hashMap.put(TaskConfig.WINDOW_MS(), String.valueOf(computeTriggerInterval));
        }
        this.streamGraph.getAllOperatorSpecs().forEach(operatorSpec -> {
            if (operatorSpec instanceof StatefulOperatorSpec) {
                ((StatefulOperatorSpec) operatorSpec).getStoreDescriptors().forEach(storeDescriptor -> {
                    hashMap.putAll(storeDescriptor.getStorageConfigs());
                });
            }
        });
        hashMap.put(CONFIG_INTERNAL_EXECUTION_PLAN, str);
        this.inEdges.stream().filter((v0) -> {
            return v0.isIntermediate();
        }).forEach(streamEdge2 -> {
            hashMap.putAll(streamEdge2.generateConfig());
        });
        addSerdeConfigs(hashMap);
        log.info("Job {} has generated configs {}", this.jobName, hashMap);
        String format = String.format(CONFIG_JOB_PREFIX, this.jobName);
        HashMap hashMap2 = new HashMap((Map) this.config);
        if (hashMap2.containsKey(TaskConfig.INPUT_STREAMS())) {
            log.warn("Specifying task inputs in configuration is not allowed with Fluent API. Ignoring configured value for " + TaskConfig.INPUT_STREAMS());
            hashMap2.remove(TaskConfig.INPUT_STREAMS());
        }
        log.debug("Job {} has allowed configs {}", this.jobName, hashMap2);
        return new JobConfig(Util.rewriteConfig(extractScopedConfig(new MapConfig(hashMap2), new MapConfig(hashMap), format)));
    }

    void addSerdeConfigs(Map<String, String> map) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        Map<StreamSpec, InputOperatorSpec> inputOperators = this.streamGraph.getInputOperators();
        this.inEdges.forEach(streamEdge -> {
            String id = streamEdge.getStreamSpec().getId();
            InputOperatorSpec inputOperatorSpec = (InputOperatorSpec) inputOperators.get(streamEdge.getStreamSpec());
            hashMap.put(id, inputOperatorSpec.getKeySerde());
            hashMap2.put(id, inputOperatorSpec.getValueSerde());
        });
        Map<StreamSpec, OutputStreamImpl> outputStreams = this.streamGraph.getOutputStreams();
        this.outEdges.forEach(streamEdge2 -> {
            String id = streamEdge2.getStreamSpec().getId();
            OutputStreamImpl outputStreamImpl = (OutputStreamImpl) outputStreams.get(streamEdge2.getStreamSpec());
            hashMap.put(id, outputStreamImpl.getKeySerde());
            hashMap2.put(id, outputStreamImpl.getValueSerde());
        });
        HashMap hashMap3 = new HashMap();
        HashMap hashMap4 = new HashMap();
        this.streamGraph.getAllOperatorSpecs().forEach(operatorSpec -> {
            if (operatorSpec instanceof StatefulOperatorSpec) {
                ((StatefulOperatorSpec) operatorSpec).getStoreDescriptors().forEach(storeDescriptor -> {
                    hashMap3.put(storeDescriptor.getStoreName(), storeDescriptor.getKeySerde());
                    hashMap4.put(storeDescriptor.getStoreName(), storeDescriptor.getMsgSerde());
                });
            }
        });
        HashSet hashSet = new HashSet(hashMap.values());
        hashSet.addAll(hashMap2.values());
        hashSet.addAll(hashMap3.values());
        hashSet.addAll(hashMap4.values());
        SerializableSerde serializableSerde = new SerializableSerde();
        Base64.Encoder encoder = Base64.getEncoder();
        HashMap hashMap5 = new HashMap();
        hashSet.forEach(serde -> {
            map.putIfAbsent(String.format(SerializerConfig.SERDE_SERIALIZED_INSTANCE(), (String) hashMap5.computeIfAbsent(serde, serde -> {
                return serde.getClass().getSimpleName() + "-" + UUID.randomUUID().toString();
            })), encoder.encodeToString(serializableSerde.toBytes(serde)));
        });
        hashMap.forEach((str, serde2) -> {
            map.put(String.format(StreamConfig.STREAM_ID_PREFIX(), str) + StreamConfig.KEY_SERDE(), hashMap5.get(serde2));
        });
        hashMap2.forEach((str2, serde3) -> {
            map.put(String.format(StreamConfig.STREAM_ID_PREFIX(), str2) + StreamConfig.MSG_SERDE(), hashMap5.get(serde3));
        });
        hashMap3.forEach((str3, serde4) -> {
            map.put(String.format(StorageConfig.KEY_SERDE(), str3), hashMap5.get(serde4));
        });
        hashMap4.forEach((str4, serde5) -> {
            map.put(String.format(StorageConfig.MSG_SERDE(), str4), hashMap5.get(serde5));
        });
    }

    private long computeTriggerInterval() {
        Collection<OperatorSpec> allOperatorSpecs = this.streamGraph.getAllOperatorSpecs();
        List list = (List) allOperatorSpecs.stream().filter(operatorSpec -> {
            return operatorSpec.getOpCode() == OperatorSpec.OpCode.WINDOW;
        }).map(operatorSpec2 -> {
            return Long.valueOf(((WindowOperatorSpec) operatorSpec2).getDefaultTriggerMs());
        }).collect(Collectors.toList());
        ArrayList arrayList = new ArrayList((List) allOperatorSpecs.stream().filter(operatorSpec3 -> {
            return operatorSpec3.getOpCode() == OperatorSpec.OpCode.JOIN;
        }).map(operatorSpec4 -> {
            return Long.valueOf(((JoinOperatorSpec) operatorSpec4).getTtlMs());
        }).collect(Collectors.toList()));
        arrayList.addAll(list);
        return MathUtils.gcd(arrayList);
    }

    private static Config extractScopedConfig(Config config, Config config2, String str) {
        Config[] configArr = {config, config2, config.subset(str)};
        HashMap hashMap = new HashMap();
        for (Config config3 : configArr) {
            for (Map.Entry entry : config3.entrySet()) {
                String str2 = (String) entry.getValue();
                if (str2 != null && !str2.isEmpty()) {
                    hashMap.put(entry.getKey(), entry.getValue());
                }
            }
        }
        MapConfig mapConfig = new MapConfig(hashMap);
        log.debug("Prefix '{}' has merged config {}", str, mapConfig);
        return mapConfig;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static String createId(String str, String str2) {
        return String.format("%s-%s", str, str2);
    }
}
