package org.apache.samza.operators;

import com.google.common.base.Preconditions;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.operators.spec.InputOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.OperatorSpecs;
import org.apache.samza.operators.spec.OutputStreamImpl;
import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.system.StreamSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/operators/StreamGraphImpl.class */
public class StreamGraphImpl implements StreamGraph {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamGraphImpl.class);
    private static final Pattern USER_DEFINED_ID_PATTERN = Pattern.compile("[\\d\\w-_.]+");
    private final ApplicationRunner runner;
    private final Config config;
    private final Map<StreamSpec, InputOperatorSpec> inputOperators = new LinkedHashMap();
    private final Map<StreamSpec, OutputStreamImpl> outputStreams = new LinkedHashMap();
    private int nextOpNum = 0;
    private final Set<String> operatorIds = new HashSet();
    private Serde<?> defaultSerde = new KVSerde(new NoOpSerde(), new NoOpSerde());
    private ContextManager contextManager = null;

    public StreamGraphImpl(ApplicationRunner applicationRunner, Config config) {
        this.runner = applicationRunner;
        this.config = config;
    }

    public void setDefaultSerde(Serde<?> serde) {
        Preconditions.checkNotNull(serde, "Default serde must not be null");
        Preconditions.checkState(this.inputOperators.isEmpty() && this.outputStreams.isEmpty(), "Default serde must be set before creating any input or output streams.");
        this.defaultSerde = serde;
    }

    public <M> MessageStream<M> getInputStream(String str, Serde<M> serde) {
        StreamSpec streamSpec = this.runner.getStreamSpec(str);
        Preconditions.checkState(streamSpec != null, "No StreamSpec found for streamId: " + str);
        Preconditions.checkNotNull(serde, "serde must not be null for an input stream.");
        Preconditions.checkState(!this.inputOperators.containsKey(streamSpec), "getInputStream must not be called multiple times with the same streamId: " + str);
        KV<Serde, Serde> kVSerdes = getKVSerdes(str, serde);
        if (this.outputStreams.containsKey(streamSpec)) {
            OutputStreamImpl outputStreamImpl = this.outputStreams.get(streamSpec);
            Preconditions.checkState(((Serde) kVSerdes.getKey()).equals(outputStreamImpl.getKeySerde()) && ((Serde) kVSerdes.getValue()).equals(outputStreamImpl.getValueSerde()), String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at stream level, so the same key and message Serde must be used for both.", str));
        }
        this.inputOperators.put(streamSpec, OperatorSpecs.createInputOperatorSpec(streamSpec, (Serde) kVSerdes.getKey(), (Serde) kVSerdes.getValue(), serde instanceof KVSerde, getNextOpId(OperatorSpec.OpCode.INPUT, null)));
        return new MessageStreamImpl(this, this.inputOperators.get(streamSpec));
    }

    public <M> MessageStream<M> getInputStream(String str) {
        return getInputStream(str, this.defaultSerde);
    }

    public <M> OutputStream<M> getOutputStream(String str, Serde<M> serde) {
        StreamSpec streamSpec = this.runner.getStreamSpec(str);
        Preconditions.checkState(streamSpec != null, "No StreamSpec found for streamId: " + str);
        Preconditions.checkNotNull(serde, "serde must not be null for an output stream.");
        Preconditions.checkState(!this.outputStreams.containsKey(streamSpec), "getOutputStream must not be called multiple times with the same streamId: " + str);
        KV<Serde, Serde> kVSerdes = getKVSerdes(str, serde);
        if (this.inputOperators.containsKey(streamSpec)) {
            InputOperatorSpec inputOperatorSpec = this.inputOperators.get(streamSpec);
            Preconditions.checkState(((Serde) kVSerdes.getKey()).equals(inputOperatorSpec.getKeySerde()) && ((Serde) kVSerdes.getValue()).equals(inputOperatorSpec.getValueSerde()), String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at stream level, so the same key and message Serde must be used for both.", str));
        }
        this.outputStreams.put(streamSpec, new OutputStreamImpl(streamSpec, (Serde) kVSerdes.getKey(), (Serde) kVSerdes.getValue(), serde instanceof KVSerde));
        return this.outputStreams.get(streamSpec);
    }

    public <M> OutputStream<M> getOutputStream(String str) {
        return getOutputStream(str, this.defaultSerde);
    }

    public StreamGraph withContextManager(ContextManager contextManager) {
        this.contextManager = contextManager;
        return this;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v24, types: [org.apache.samza.serializers.Serde<?>] */
    public <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String str, Serde<M> serde) {
        StreamSpec streamSpec = this.runner.getStreamSpec(str);
        Preconditions.checkState((this.inputOperators.containsKey(streamSpec) || this.outputStreams.containsKey(streamSpec)) ? false : true, "getIntermediateStream must not be called multiple times with the same streamId: " + str);
        if (serde == null) {
            LOGGER.info("Using default serde for intermediate stream: " + str);
            serde = this.defaultSerde;
        }
        boolean z = serde instanceof KVSerde;
        KV<Serde, Serde> kVSerdes = getKVSerdes(str, serde);
        this.inputOperators.put(streamSpec, OperatorSpecs.createInputOperatorSpec(streamSpec, (Serde) kVSerdes.getKey(), (Serde) kVSerdes.getValue(), z, getNextOpId(OperatorSpec.OpCode.INPUT, null)));
        this.outputStreams.put(streamSpec, new OutputStreamImpl(streamSpec, (Serde) kVSerdes.getKey(), (Serde) kVSerdes.getValue(), z));
        return new IntermediateMessageStreamImpl<>(this, this.inputOperators.get(streamSpec), this.outputStreams.get(streamSpec));
    }

    public Map<StreamSpec, InputOperatorSpec> getInputOperators() {
        return Collections.unmodifiableMap(this.inputOperators);
    }

    public Map<StreamSpec, OutputStreamImpl> getOutputStreams() {
        return Collections.unmodifiableMap(this.outputStreams);
    }

    public ContextManager getContextManager() {
        return this.contextManager;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getNextOpId(OperatorSpec.OpCode opCode, String str) {
        if (StringUtils.isNotBlank(str) && !USER_DEFINED_ID_PATTERN.matcher(str).matches()) {
            throw new SamzaException("Operator ID must not contain spaces and special characters: " + str);
        }
        Object[] objArr = new Object[4];
        objArr[0] = this.config.get(JobConfig.JOB_NAME());
        objArr[1] = this.config.get(JobConfig.JOB_ID(), "1");
        objArr[2] = opCode.name().toLowerCase();
        objArr[3] = StringUtils.isNotBlank(str) ? str.trim() : String.valueOf(this.nextOpNum);
        String format = String.format("%s-%s-%s-%s", objArr);
        if (!this.operatorIds.add(format)) {
            throw new SamzaException(String.format("Found duplicate operator ID %s in the graph. Operator IDs must be unique.", format));
        }
        this.nextOpNum++;
        return format;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getNextOpId(OperatorSpec.OpCode opCode) {
        return getNextOpId(opCode, null);
    }

    public Collection<OperatorSpec> getAllOperatorSpecs() {
        Collection<InputOperatorSpec> values = this.inputOperators.values();
        HashSet hashSet = new HashSet();
        for (InputOperatorSpec inputOperatorSpec : values) {
            hashSet.add(inputOperatorSpec);
            doGetOperatorSpecs(inputOperatorSpec, hashSet);
        }
        return hashSet;
    }

    private void doGetOperatorSpecs(OperatorSpec operatorSpec, Set<OperatorSpec> set) {
        for (OperatorSpec operatorSpec2 : operatorSpec.getRegisteredOperatorSpecs()) {
            set.add(operatorSpec2);
            doGetOperatorSpecs(operatorSpec2, set);
        }
    }

    public boolean hasWindowOrJoins() {
        return ((Set) getAllOperatorSpecs().stream().filter(operatorSpec -> {
            return operatorSpec.getOpCode() == OperatorSpec.OpCode.WINDOW || operatorSpec.getOpCode() == OperatorSpec.OpCode.JOIN;
        }).collect(Collectors.toSet())).size() != 0;
    }

    private KV<Serde, Serde> getKVSerdes(String str, Serde serde) {
        Serde noOpSerde;
        Serde serde2;
        if (serde instanceof KVSerde) {
            noOpSerde = ((KVSerde) serde).getKeySerde();
            serde2 = ((KVSerde) serde).getValueSerde();
        } else {
            noOpSerde = new NoOpSerde();
            serde2 = serde;
        }
        if (noOpSerde instanceof NoOpSerde) {
            LOGGER.info("Using NoOpSerde as the key serde for stream " + str + ". Keys will not be (de)serialized");
        }
        if (serde2 instanceof NoOpSerde) {
            LOGGER.info("Using NoOpSerde as the value serde for stream " + str + ". Values will not be (de)serialized");
        }
        return KV.of(noOpSerde, serde2);
    }
}
