/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.lang.reflect.Array;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.AbstractStream;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.GlobalKTableImpl;
import org.apache.kafka.streams.kstream.internals.GroupedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl;
import org.apache.kafka.streams.kstream.internals.KStreamBranch;
import org.apache.kafka.streams.kstream.internals.KStreamFilter;
import org.apache.kafka.streams.kstream.internals.KStreamFlatMap;
import org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues;
import org.apache.kafka.streams.kstream.internals.KStreamFlatTransform;
import org.apache.kafka.streams.kstream.internals.KStreamGlobalKTableJoin;
import org.apache.kafka.streams.kstream.internals.KStreamJoinWindow;
import org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin;
import org.apache.kafka.streams.kstream.internals.KStreamKTableJoin;
import org.apache.kafka.streams.kstream.internals.KStreamMap;
import org.apache.kafka.streams.kstream.internals.KStreamMapValues;
import org.apache.kafka.streams.kstream.internals.KStreamPassThrough;
import org.apache.kafka.streams.kstream.internals.KStreamPeek;
import org.apache.kafka.streams.kstream.internals.KStreamTransformValues;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.kstream.internals.PrintedInternal;
import org.apache.kafka.streams.kstream.internals.ProducedInternal;
import org.apache.kafka.streams.kstream.internals.SerializedInternal;
import org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter;
import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamSinkNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamStreamJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamTableJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;

public class KStreamImpl<K, V>
extends AbstractStream<K, V>
implements KStream<K, V> {
    static final String SOURCE_NAME = "KSTREAM-SOURCE-";
    static final String SINK_NAME = "KSTREAM-SINK-";
    static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
    private static final String BRANCH_NAME = "KSTREAM-BRANCH-";
    private static final String BRANCHCHILD_NAME = "KSTREAM-BRANCHCHILD-";
    private static final String FILTER_NAME = "KSTREAM-FILTER-";
    private static final String PEEK_NAME = "KSTREAM-PEEK-";
    private static final String FLATMAP_NAME = "KSTREAM-FLATMAP-";
    private static final String FLATMAPVALUES_NAME = "KSTREAM-FLATMAPVALUES-";
    private static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-";
    private static final String JOINOTHER_NAME = "KSTREAM-JOINOTHER-";
    private static final String JOIN_NAME = "KSTREAM-JOIN-";
    private static final String LEFTJOIN_NAME = "KSTREAM-LEFTJOIN-";
    private static final String MAP_NAME = "KSTREAM-MAP-";
    private static final String MAPVALUES_NAME = "KSTREAM-MAPVALUES-";
    private static final String MERGE_NAME = "KSTREAM-MERGE-";
    private static final String OUTERTHIS_NAME = "KSTREAM-OUTERTHIS-";
    private static final String OUTEROTHER_NAME = "KSTREAM-OUTEROTHER-";
    private static final String PROCESSOR_NAME = "KSTREAM-PROCESSOR-";
    private static final String PRINTING_NAME = "KSTREAM-PRINTER-";
    private static final String KEY_SELECT_NAME = "KSTREAM-KEY-SELECT-";
    private static final String TRANSFORM_NAME = "KSTREAM-TRANSFORM-";
    private static final String TRANSFORMVALUES_NAME = "KSTREAM-TRANSFORMVALUES-";
    private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
    private static final String FOREACH_NAME = "KSTREAM-FOREACH-";
    private final boolean repartitionRequired;

    KStreamImpl(String name, Serde<K> keySerde, Serde<V> valueSerde, Set<String> sourceNodes, boolean repartitionRequired, StreamsGraphNode streamsGraphNode, InternalStreamsBuilder builder) {
        super(name, keySerde, valueSerde, sourceNodes, streamsGraphNode, builder);
        this.repartitionRequired = repartitionRequired;
    }

    @Override
    public KStream<K, V> filter(Predicate<? super K, ? super V> predicate) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        String name = this.builder.newProcessorName(FILTER_NAME);
        ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<K, V>(new KStreamFilter<K, V>(predicate, false), name);
        ProcessorGraphNode<? super K, ? super V> filterProcessorNode = new ProcessorGraphNode<K, V>(name, processorParameters, this.repartitionRequired);
        this.builder.addGraphNode(this.streamsGraphNode, filterProcessorNode);
        return new KStreamImpl<K, V>(name, this.keySerde, this.valSerde, this.sourceNodes, this.repartitionRequired, filterProcessorNode, this.builder);
    }

    @Override
    public KStream<K, V> filterNot(Predicate<? super K, ? super V> predicate) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        String name = this.builder.newProcessorName(FILTER_NAME);
        ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<K, V>(new KStreamFilter<K, V>(predicate, true), name);
        ProcessorGraphNode<? super K, ? super V> filterNotProcessorNode = new ProcessorGraphNode<K, V>(name, processorParameters, this.repartitionRequired);
        this.builder.addGraphNode(this.streamsGraphNode, filterNotProcessorNode);
        return new KStreamImpl<K, V>(name, this.keySerde, this.valSerde, this.sourceNodes, this.repartitionRequired, filterNotProcessorNode, this.builder);
    }

    @Override
    public <KR> KStream<KR, V> selectKey(KeyValueMapper<? super K, ? super V, ? extends KR> mapper) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        ProcessorGraphNode<K, V> selectKeyProcessorNode = this.internalSelectKey(mapper);
        selectKeyProcessorNode.keyChangingOperation(true);
        this.builder.addGraphNode(this.streamsGraphNode, selectKeyProcessorNode);
        return new KStreamImpl<K, V>(selectKeyProcessorNode.nodeName(), null, this.valSerde, this.sourceNodes, true, selectKeyProcessorNode, this.builder);
    }

    private <KR> ProcessorGraphNode<K, V> internalSelectKey(KeyValueMapper<? super K, ? super V, ? extends KR> mapper) {
        String name = this.builder.newProcessorName(KEY_SELECT_NAME);
        KStreamMap kStreamMap = new KStreamMap((key, value) -> new KeyValue(mapper.apply(key, value), value));
        ProcessorParameters processorParameters = new ProcessorParameters(kStreamMap, name);
        return new ProcessorGraphNode(name, processorParameters, this.repartitionRequired);
    }

    @Override
    public <KR, VR> KStream<KR, VR> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        String name = this.builder.newProcessorName(MAP_NAME);
        ProcessorParameters processorParameters = new ProcessorParameters(new KStreamMap(mapper), name);
        ProcessorGraphNode mapProcessorNode = new ProcessorGraphNode(name, processorParameters, true);
        mapProcessorNode.keyChangingOperation(true);
        this.builder.addGraphNode(this.streamsGraphNode, mapProcessorNode);
        return new KStreamImpl<K, V>(name, null, null, this.sourceNodes, true, mapProcessorNode, this.builder);
    }

    @Override
    public <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> mapper) {
        return this.mapValues(KStreamImpl.withKey(mapper));
    }

    @Override
    public <VR> KStream<K, VR> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        String name = this.builder.newProcessorName(MAPVALUES_NAME);
        ProcessorParameters processorParameters = new ProcessorParameters(new KStreamMapValues<K, V, VR>(mapper), name);
        ProcessorGraphNode mapValuesProcessorNode = new ProcessorGraphNode(name, processorParameters, this.repartitionRequired);
        mapValuesProcessorNode.setValueChangingOperation(true);
        this.builder.addGraphNode(this.streamsGraphNode, mapValuesProcessorNode);
        return new KStreamImpl<K, V>(name, this.keySerde, null, this.sourceNodes, this.repartitionRequired, mapValuesProcessorNode, this.builder);
    }

    @Override
    public void print(Printed<K, V> printed) {
        Objects.requireNonNull(printed, "printed can't be null");
        PrintedInternal<K, V> printedInternal = new PrintedInternal<K, V>(printed);
        String name = this.builder.newProcessorName(PRINTING_NAME);
        ProcessorParameters<K, V> processorParameters = new ProcessorParameters<K, V>(printedInternal.build(this.name), name);
        ProcessorGraphNode<K, V> printNode = new ProcessorGraphNode<K, V>(name, processorParameters, false);
        this.builder.addGraphNode(this.streamsGraphNode, printNode);
    }

    @Override
    public <KR, VR> KStream<KR, VR> flatMap(KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        String name = this.builder.newProcessorName(FLATMAP_NAME);
        ProcessorParameters processorParameters = new ProcessorParameters(new KStreamFlatMap(mapper), name);
        ProcessorGraphNode flatMapNode = new ProcessorGraphNode(name, processorParameters, true);
        flatMapNode.keyChangingOperation(true);
        this.builder.addGraphNode(this.streamsGraphNode, flatMapNode);
        return new KStreamImpl<K, V>(name, null, null, this.sourceNodes, true, flatMapNode, this.builder);
    }

    @Override
    public <VR> KStream<K, VR> flatMapValues(ValueMapper<? super V, ? extends Iterable<? extends VR>> mapper) {
        return this.flatMapValues(KStreamImpl.withKey(mapper));
    }

    @Override
    public <VR> KStream<K, VR> flatMapValues(ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> mapper) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        String name = this.builder.newProcessorName(FLATMAPVALUES_NAME);
        ProcessorParameters processorParameters = new ProcessorParameters(new KStreamFlatMapValues(mapper), name);
        ProcessorGraphNode flatMapValuesNode = new ProcessorGraphNode(name, processorParameters, this.repartitionRequired);
        flatMapValuesNode.setValueChangingOperation(true);
        this.builder.addGraphNode(this.streamsGraphNode, flatMapValuesNode);
        return new KStreamImpl<K, V>(name, this.keySerde, null, this.sourceNodes, this.repartitionRequired, flatMapValuesNode, this.builder);
    }

    @Override
    public KStream<K, V>[] branch(Predicate<? super K, ? super V> ... predicates) {
        if (predicates.length == 0) {
            throw new IllegalArgumentException("you must provide at least one predicate");
        }
        for (Predicate<? super K, ? super V> predicate : predicates) {
            Objects.requireNonNull(predicate, "predicates can't have null values");
        }
        String branchName = this.builder.newProcessorName(BRANCH_NAME);
        String[] childNames = new String[predicates.length];
        for (int i = 0; i < predicates.length; ++i) {
            childNames[i] = this.builder.newProcessorName(BRANCHCHILD_NAME);
        }
        ProcessorParameters processorParameters = new ProcessorParameters(new KStreamBranch((Predicate[])predicates.clone(), childNames), branchName);
        ProcessorGraphNode branchNode = new ProcessorGraphNode(branchName, processorParameters, false);
        this.builder.addGraphNode(this.streamsGraphNode, branchNode);
        KStream[] branchChildren = (KStream[])Array.newInstance(KStream.class, predicates.length);
        for (int i = 0; i < predicates.length; ++i) {
            ProcessorParameters innerProcessorParameters = new ProcessorParameters(new KStreamPassThrough(), childNames[i]);
            ProcessorGraphNode branchChildNode = new ProcessorGraphNode(childNames[i], innerProcessorParameters, this.repartitionRequired);
            this.builder.addGraphNode(branchNode, branchChildNode);
            branchChildren[i] = new KStreamImpl<K, V>(childNames[i], this.keySerde, this.valSerde, this.sourceNodes, this.repartitionRequired, branchChildNode, this.builder);
        }
        return branchChildren;
    }

    @Override
    public KStream<K, V> merge(KStream<K, V> stream) {
        Objects.requireNonNull(stream);
        return this.merge(this.builder, stream);
    }

    private KStream<K, V> merge(InternalStreamsBuilder builder, KStream<K, V> stream) {
        KStreamImpl streamImpl = (KStreamImpl)stream;
        String name = builder.newProcessorName(MERGE_NAME);
        HashSet<String> allSourceNodes = new HashSet<String>();
        boolean requireRepartitioning = streamImpl.repartitionRequired || this.repartitionRequired;
        allSourceNodes.addAll(this.sourceNodes);
        allSourceNodes.addAll(streamImpl.sourceNodes);
        ProcessorParameters processorParameters = new ProcessorParameters(new KStreamPassThrough(), name);
        ProcessorGraphNode mergeNode = new ProcessorGraphNode(name, processorParameters, requireRepartitioning);
        mergeNode.setMergeNode(true);
        builder.addGraphNode(Arrays.asList(this.streamsGraphNode, streamImpl.streamsGraphNode), mergeNode);
        return new KStreamImpl<K, V>(name, null, null, allSourceNodes, requireRepartitioning, mergeNode, builder);
    }

    @Override
    public void foreach(ForeachAction<? super K, ? super V> action) {
        Objects.requireNonNull(action, "action can't be null");
        String name = this.builder.newProcessorName(FOREACH_NAME);
        ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<K, V>(new KStreamPeek<K, V>(action, false), name);
        ProcessorGraphNode<? super K, ? super V> foreachNode = new ProcessorGraphNode<K, V>(name, processorParameters, this.repartitionRequired);
        this.builder.addGraphNode(this.streamsGraphNode, foreachNode);
    }

    @Override
    public KStream<K, V> peek(ForeachAction<? super K, ? super V> action) {
        Objects.requireNonNull(action, "action can't be null");
        String name = this.builder.newProcessorName(PEEK_NAME);
        ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<K, V>(new KStreamPeek<K, V>(action, true), name);
        ProcessorGraphNode<? super K, ? super V> peekNode = new ProcessorGraphNode<K, V>(name, processorParameters, this.repartitionRequired);
        this.builder.addGraphNode(this.streamsGraphNode, peekNode);
        return new KStreamImpl<K, V>(name, this.keySerde, this.valSerde, this.sourceNodes, this.repartitionRequired, peekNode, this.builder);
    }

    @Override
    public KStream<K, V> through(String topic) {
        return this.through(topic, Produced.with(this.keySerde, this.valSerde, null));
    }

    @Override
    public KStream<K, V> through(String topic, Produced<K, V> produced) {
        Objects.requireNonNull(topic, "topic can't be null");
        Objects.requireNonNull(produced, "Produced can't be null");
        ProducedInternal<K, V> producedInternal = new ProducedInternal<K, V>(produced);
        if (producedInternal.keySerde() == null) {
            producedInternal.withKeySerde(this.keySerde);
        }
        if (producedInternal.valueSerde() == null) {
            producedInternal.withValueSerde(this.valSerde);
        }
        this.to(topic, producedInternal);
        return this.builder.stream(Collections.singleton(topic), new ConsumedInternal<K, V>(producedInternal.keySerde(), producedInternal.valueSerde(), new FailOnInvalidTimestamp(), null));
    }

    @Override
    public void to(String topic) {
        this.to(topic, Produced.with(this.keySerde, this.valSerde, null));
    }

    @Override
    public void to(String topic, Produced<K, V> produced) {
        Objects.requireNonNull(topic, "topic can't be null");
        Objects.requireNonNull(produced, "Produced can't be null");
        ProducedInternal<K, V> producedInternal = new ProducedInternal<K, V>(produced);
        if (producedInternal.keySerde() == null) {
            producedInternal.withKeySerde(this.keySerde);
        }
        if (producedInternal.valueSerde() == null) {
            producedInternal.withValueSerde(this.valSerde);
        }
        this.to((TopicNameExtractor<K, V>)new StaticTopicNameExtractor(topic), producedInternal);
    }

    @Override
    public void to(TopicNameExtractor<K, V> topicExtractor) {
        this.to(topicExtractor, Produced.with(this.keySerde, this.valSerde, null));
    }

    @Override
    public void to(TopicNameExtractor<K, V> topicExtractor, Produced<K, V> produced) {
        Objects.requireNonNull(topicExtractor, "topic extractor can't be null");
        Objects.requireNonNull(produced, "Produced can't be null");
        ProducedInternal<K, V> producedInternal = new ProducedInternal<K, V>(produced);
        if (producedInternal.keySerde() == null) {
            producedInternal.withKeySerde(this.keySerde);
        }
        if (producedInternal.valueSerde() == null) {
            producedInternal.withValueSerde(this.valSerde);
        }
        this.to(topicExtractor, producedInternal);
    }

    @Override
    private void to(TopicNameExtractor<K, V> topicExtractor, ProducedInternal<K, V> produced) {
        String name = this.builder.newProcessorName(SINK_NAME);
        StreamSinkNode<K, V> sinkNode = new StreamSinkNode<K, V>(name, topicExtractor, produced);
        this.builder.addGraphNode(this.streamsGraphNode, sinkNode);
    }

    @Override
    public <KR, VR> KStream<KR, VR> transform(TransformerSupplier<? super K, ? super V, KeyValue<KR, VR>> transformerSupplier, String ... stateStoreNames) {
        Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
        return this.flatTransform(new TransformerSupplierAdapter<K, V, KR, VR>(transformerSupplier), stateStoreNames);
    }

    @Override
    public <K1, V1> KStream<K1, V1> flatTransform(TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier, String ... stateStoreNames) {
        Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
        String name = this.builder.newProcessorName(TRANSFORM_NAME);
        StatefulProcessorNode transformNode = new StatefulProcessorNode(name, new ProcessorParameters(new KStreamFlatTransform<K, V, K1, V1>(transformerSupplier), name), stateStoreNames, true);
        transformNode.keyChangingOperation(true);
        this.builder.addGraphNode(this.streamsGraphNode, transformNode);
        return new KStreamImpl<K, V>(name, null, null, this.sourceNodes, true, transformNode, this.builder);
    }

    @Override
    public <VR> KStream<K, VR> transformValues(ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier, String ... stateStoreNames) {
        Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null");
        return this.doTransformValues(KStreamImpl.toValueTransformerWithKeySupplier(valueTransformerSupplier), stateStoreNames);
    }

    @Override
    public <VR> KStream<K, VR> transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier, String ... stateStoreNames) {
        Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null");
        return this.doTransformValues(valueTransformerSupplier, stateStoreNames);
    }

    private <VR> KStream<K, VR> doTransformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerWithKeySupplier, String ... stateStoreNames) {
        String name = this.builder.newProcessorName(TRANSFORMVALUES_NAME);
        StatefulProcessorNode transformNode = new StatefulProcessorNode(name, new ProcessorParameters(new KStreamTransformValues<K, V, VR>(valueTransformerWithKeySupplier), name), stateStoreNames, this.repartitionRequired);
        transformNode.setValueChangingOperation(true);
        this.builder.addGraphNode(this.streamsGraphNode, transformNode);
        return new KStreamImpl<K, V>(name, this.keySerde, null, this.sourceNodes, this.repartitionRequired, transformNode, this.builder);
    }

    @Override
    public void process(ProcessorSupplier<? super K, ? super V> processorSupplier, String ... stateStoreNames) {
        Objects.requireNonNull(processorSupplier, "ProcessSupplier cant' be null");
        String name = this.builder.newProcessorName(PROCESSOR_NAME);
        StatefulProcessorNode<? super K, ? super V> processNode = new StatefulProcessorNode<K, V>(name, new ProcessorParameters<K, V>(processorSupplier, name), stateStoreNames, this.repartitionRequired);
        this.builder.addGraphNode(this.streamsGraphNode, processNode);
    }

    @Override
    public <VO, VR> KStream<K, VR> join(KStream<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows) {
        return this.join(other, joiner, windows, Joined.with(null, null, null));
    }

    @Override
    public <VO, VR> KStream<K, VR> join(KStream<K, VO> otherStream, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, Joined<K, V, VO> joined) {
        return this.doJoin(otherStream, joiner, windows, joined, new KStreamImplJoin(false, false));
    }

    @Override
    public <VO, VR> KStream<K, VR> outerJoin(KStream<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows) {
        return this.outerJoin(other, joiner, windows, Joined.with(null, null, null));
    }

    @Override
    public <VO, VR> KStream<K, VR> outerJoin(KStream<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, Joined<K, V, VO> joined) {
        return this.doJoin(other, joiner, windows, joined, new KStreamImplJoin(true, true));
    }

    private <VO, VR> KStream<K, VR> doJoin(KStream<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, Joined<K, V, VO> joined, KStreamImplJoin join) {
        Objects.requireNonNull(other, "other KStream can't be null");
        Objects.requireNonNull(joiner, "joiner can't be null");
        Objects.requireNonNull(windows, "windows can't be null");
        Objects.requireNonNull(joined, "joined can't be null");
        KStreamImpl<K, V> joinThis = this;
        KStreamImpl<K, VO> joinOther = (KStreamImpl<K, VO>)other;
        if (joinThis.repartitionRequired) {
            String leftJoinRepartitionTopicName = joined.name() != null ? joined.name() + "-left" : joinThis.name;
            joinThis = joinThis.repartitionForJoin(leftJoinRepartitionTopicName, joined.keySerde(), joined.valueSerde());
        }
        if (joinOther.repartitionRequired) {
            String rightJoinRepartitionTopicName = joined.name() != null ? joined.name() + "-right" : joinOther.name;
            joinOther = joinOther.repartitionForJoin(rightJoinRepartitionTopicName, joined.keySerde(), joined.otherValueSerde());
        }
        joinThis.ensureJoinableWith(joinOther);
        return join.join(joinThis, joinOther, joiner, windows, joined);
    }

    private KStreamImpl<K, V> repartitionForJoin(String repartitionName, Serde<K> keySerdeOverride, Serde<V> valueSerdeOverride) {
        Serde repartitionKeySerde = keySerdeOverride != null ? keySerdeOverride : this.keySerde;
        Serde repartitionValueSerde = valueSerdeOverride != null ? valueSerdeOverride : this.valSerde;
        OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder optimizableRepartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
        String repartitionedSourceName = KStreamImpl.createRepartitionedSource(this.builder, repartitionKeySerde, repartitionValueSerde, repartitionName, optimizableRepartitionNodeBuilder);
        OptimizableRepartitionNode optimizableRepartitionNode = optimizableRepartitionNodeBuilder.build();
        this.builder.addGraphNode(this.streamsGraphNode, optimizableRepartitionNode);
        return new KStreamImpl<K, V>(repartitionedSourceName, repartitionKeySerde, repartitionValueSerde, Collections.singleton(repartitionedSourceName), false, optimizableRepartitionNode, this.builder);
    }

    static <K1, V1> String createRepartitionedSource(InternalStreamsBuilder builder, Serde<K1> keySerde, Serde<V1> valSerde, String repartitionTopicNamePrefix, OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K1, V1> optimizableRepartitionNodeBuilder) {
        String repartitionTopic = repartitionTopicNamePrefix + REPARTITION_TOPIC_SUFFIX;
        String sinkName = builder.newProcessorName(SINK_NAME);
        String nullKeyFilterProcessorName = builder.newProcessorName(FILTER_NAME);
        String sourceName = builder.newProcessorName(SOURCE_NAME);
        Predicate<Object, Object> notNullKeyPredicate = (k, v) -> k != null;
        ProcessorParameters<Object, Object> processorParameters = new ProcessorParameters<Object, Object>(new KStreamFilter<Object, Object>(notNullKeyPredicate, false), nullKeyFilterProcessorName);
        optimizableRepartitionNodeBuilder.withKeySerde(keySerde).withValueSerde(valSerde).withSourceName(sourceName).withRepartitionTopic(repartitionTopic).withSinkName(sinkName).withProcessorParameters(processorParameters).withNodeName(sourceName);
        return sourceName;
    }

    @Override
    public <VO, VR> KStream<K, VR> leftJoin(KStream<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows) {
        return this.leftJoin(other, joiner, windows, Joined.with(null, null, null));
    }

    @Override
    public <VO, VR> KStream<K, VR> leftJoin(KStream<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, Joined<K, V, VO> joined) {
        Objects.requireNonNull(joined, "joined can't be null");
        return this.doJoin(other, joiner, windows, joined, new KStreamImplJoin(true, false));
    }

    @Override
    public <VO, VR> KStream<K, VR> join(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner) {
        return this.join(other, joiner, (Joined<K, V, VO>)Joined.with(null, null, null));
    }

    @Override
    public <VO, VR> KStream<K, VR> join(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Joined<K, V, VO> joined) {
        Objects.requireNonNull(other, "other can't be null");
        Objects.requireNonNull(joiner, "joiner can't be null");
        Objects.requireNonNull(joined, "joined can't be null");
        if (this.repartitionRequired) {
            KStreamImpl<K, V> thisStreamRepartitioned = this.repartitionForJoin(joined.name() != null ? joined.name() : this.name, joined.keySerde(), joined.valueSerde());
            return super.doStreamTableJoin(other, joiner, joined, false);
        }
        return this.doStreamTableJoin(other, joiner, joined, false);
    }

    @Override
    public <VO, VR> KStream<K, VR> leftJoin(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner) {
        return this.leftJoin(other, joiner, (Joined<K, V, VO>)Joined.with(null, null, null));
    }

    @Override
    public <VO, VR> KStream<K, VR> leftJoin(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Joined<K, V, VO> joined) {
        Objects.requireNonNull(other, "other can't be null");
        Objects.requireNonNull(joiner, "joiner can't be null");
        Objects.requireNonNull(joined, "joined can't be null");
        if (this.repartitionRequired) {
            KStreamImpl<K, V> thisStreamRepartitioned = this.repartitionForJoin(joined.name() != null ? joined.name() : this.name, joined.keySerde(), joined.valueSerde());
            return super.doStreamTableJoin(other, joiner, joined, true);
        }
        return this.doStreamTableJoin(other, joiner, joined, true);
    }

    @Override
    public <KG, VG, VR> KStream<K, VR> join(GlobalKTable<KG, VG> globalTable, KeyValueMapper<? super K, ? super V, ? extends KG> keyMapper, ValueJoiner<? super V, ? super VG, ? extends VR> joiner) {
        return this.globalTableJoin(globalTable, keyMapper, joiner, false);
    }

    @Override
    public <KG, VG, VR> KStream<K, VR> leftJoin(GlobalKTable<KG, VG> globalTable, KeyValueMapper<? super K, ? super V, ? extends KG> keyMapper, ValueJoiner<? super V, ? super VG, ? extends VR> joiner) {
        return this.globalTableJoin(globalTable, keyMapper, joiner, true);
    }

    private <KG, VG, VR> KStream<K, VR> globalTableJoin(GlobalKTable<KG, VG> globalTable, KeyValueMapper<? super K, ? super V, ? extends KG> keyMapper, ValueJoiner<? super V, ? super VG, ? extends VR> joiner, boolean leftJoin) {
        Objects.requireNonNull(globalTable, "globalTable can't be null");
        Objects.requireNonNull(keyMapper, "keyMapper can't be null");
        Objects.requireNonNull(joiner, "joiner can't be null");
        KTableValueGetterSupplier valueGetterSupplier = ((GlobalKTableImpl)globalTable).valueGetterSupplier();
        String name = this.builder.newProcessorName(LEFTJOIN_NAME);
        KStreamGlobalKTableJoin<? super K, ? extends KG, ? extends VR, ? super V, ? super VG> processorSupplier = new KStreamGlobalKTableJoin<K, KG, VR, V, VG>(valueGetterSupplier, joiner, keyMapper, leftJoin);
        ProcessorParameters processorParameters = new ProcessorParameters(processorSupplier, name);
        StreamTableJoinNode streamTableJoinNode = new StreamTableJoinNode(name, processorParameters, new String[0], null);
        this.builder.addGraphNode(this.streamsGraphNode, streamTableJoinNode);
        return new KStreamImpl<K, V>(name, this.keySerde, null, this.sourceNodes, this.repartitionRequired, streamTableJoinNode, this.builder);
    }

    private <VO, VR> KStream<K, VR> doStreamTableJoin(KTable<K, VO> other, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Joined<K, V, VO> joined, boolean leftJoin) {
        Objects.requireNonNull(other, "other KTable can't be null");
        Objects.requireNonNull(joiner, "joiner can't be null");
        Set<String> allSourceNodes = this.ensureJoinableWith((AbstractStream)((Object)other));
        String name = this.builder.newProcessorName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
        KStreamKTableJoin processorSupplier = new KStreamKTableJoin(((KTableImpl)other).valueGetterSupplier(), joiner, leftJoin);
        ProcessorParameters processorParameters = new ProcessorParameters(processorSupplier, name);
        StreamTableJoinNode streamTableJoinNode = new StreamTableJoinNode(name, processorParameters, ((KTableImpl)other).valueGetterSupplier().storeNames(), this.name);
        this.builder.addGraphNode(this.streamsGraphNode, streamTableJoinNode);
        return new KStreamImpl<K, V>(name, joined.keySerde() != null ? joined.keySerde() : this.keySerde, null, allSourceNodes, false, streamTableJoinNode, this.builder);
    }

    @Override
    public <KR> KGroupedStream<KR, V> groupBy(KeyValueMapper<? super K, ? super V, KR> selector) {
        return this.groupBy(selector, Grouped.with(null, this.valSerde));
    }

    @Override
    @Deprecated
    public <KR> KGroupedStream<KR, V> groupBy(KeyValueMapper<? super K, ? super V, KR> selector, Serialized<KR, V> serialized) {
        Objects.requireNonNull(selector, "selector can't be null");
        Objects.requireNonNull(serialized, "serialized can't be null");
        SerializedInternal<KR, V> serializedInternal = new SerializedInternal<KR, V>(serialized);
        return this.groupBy(selector, Grouped.with(serializedInternal.keySerde(), serializedInternal.valueSerde()));
    }

    @Override
    public <KR> KGroupedStream<KR, V> groupBy(KeyValueMapper<? super K, ? super V, KR> selector, Grouped<KR, V> grouped) {
        Objects.requireNonNull(selector, "selector can't be null");
        Objects.requireNonNull(grouped, "grouped can't be null");
        GroupedInternal<KR, V> groupedInternal = new GroupedInternal<KR, V>(grouped);
        ProcessorGraphNode<K, V> selectKeyMapNode = this.internalSelectKey(selector);
        selectKeyMapNode.keyChangingOperation(true);
        this.builder.addGraphNode(this.streamsGraphNode, selectKeyMapNode);
        return new KGroupedStreamImpl<KR, V>(selectKeyMapNode.nodeName(), this.sourceNodes, groupedInternal, true, selectKeyMapNode, this.builder);
    }

    @Override
    public KGroupedStream<K, V> groupByKey() {
        return this.groupByKey(Grouped.with(this.keySerde, this.valSerde));
    }

    @Override
    @Deprecated
    public KGroupedStream<K, V> groupByKey(Serialized<K, V> serialized) {
        SerializedInternal<K, V> serializedInternal = new SerializedInternal<K, V>(serialized);
        return this.groupByKey(Grouped.with(serializedInternal.keySerde(), serializedInternal.valueSerde()));
    }

    @Override
    public KGroupedStream<K, V> groupByKey(Grouped<K, V> grouped) {
        GroupedInternal<K, V> groupedInternal = new GroupedInternal<K, V>(grouped);
        return new KGroupedStreamImpl<K, V>(this.name, this.sourceNodes, groupedInternal, this.repartitionRequired, this.streamsGraphNode, this.builder);
    }

    private static <K, V> StoreBuilder<WindowStore<K, V>> joinWindowStoreBuilder(String joinName, JoinWindows windows, Serde<K> keySerde, Serde<V> valueSerde) {
        return Stores.windowStoreBuilder(Stores.persistentWindowStore(joinName + "-store", Duration.ofMillis(windows.size() + windows.gracePeriodMs()), Duration.ofMillis(windows.size()), true), keySerde, valueSerde);
    }

    private class KStreamImplJoin {
        private final boolean leftOuter;
        private final boolean rightOuter;

        KStreamImplJoin(boolean leftOuter, boolean rightOuter) {
            this.leftOuter = leftOuter;
            this.rightOuter = rightOuter;
        }

        public <K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> lhs, KStream<K1, V2> other, ValueJoiner<? super V1, ? super V2, ? extends R> joiner, JoinWindows windows, Joined<K1, V1, V2> joined) {
            String thisWindowStreamName = KStreamImpl.this.builder.newProcessorName(KStreamImpl.WINDOWED_NAME);
            String otherWindowStreamName = KStreamImpl.this.builder.newProcessorName(KStreamImpl.WINDOWED_NAME);
            String joinThisName = this.rightOuter ? KStreamImpl.this.builder.newProcessorName(KStreamImpl.OUTERTHIS_NAME) : KStreamImpl.this.builder.newProcessorName(KStreamImpl.JOINTHIS_NAME);
            String joinOtherName = this.leftOuter ? KStreamImpl.this.builder.newProcessorName(KStreamImpl.OUTEROTHER_NAME) : KStreamImpl.this.builder.newProcessorName(KStreamImpl.JOINOTHER_NAME);
            String joinMergeName = KStreamImpl.this.builder.newProcessorName(KStreamImpl.MERGE_NAME);
            StreamsGraphNode thisStreamsGraphNode = ((AbstractStream)((Object)lhs)).streamsGraphNode;
            StreamsGraphNode otherStreamsGraphNode = ((AbstractStream)((Object)other)).streamsGraphNode;
            StoreBuilder thisWindowStore = KStreamImpl.joinWindowStoreBuilder(joinThisName, windows, joined.keySerde(), joined.valueSerde());
            StoreBuilder otherWindowStore = KStreamImpl.joinWindowStoreBuilder(joinOtherName, windows, joined.keySerde(), joined.otherValueSerde());
            KStreamJoinWindow thisWindowedStream = new KStreamJoinWindow(thisWindowStore.name());
            ProcessorParameters thisWindowStreamProcessorParams = new ProcessorParameters(thisWindowedStream, thisWindowStreamName);
            ProcessorGraphNode thisWindowedStreamsNode = new ProcessorGraphNode(thisWindowStreamName, thisWindowStreamProcessorParams);
            KStreamImpl.this.builder.addGraphNode(thisStreamsGraphNode, thisWindowedStreamsNode);
            KStreamJoinWindow otherWindowedStream = new KStreamJoinWindow(otherWindowStore.name());
            ProcessorParameters otherWindowStreamProcessorParams = new ProcessorParameters(otherWindowedStream, otherWindowStreamName);
            ProcessorGraphNode otherWindowedStreamsNode = new ProcessorGraphNode(otherWindowStreamName, otherWindowStreamProcessorParams);
            KStreamImpl.this.builder.addGraphNode(otherStreamsGraphNode, otherWindowedStreamsNode);
            KStreamKStreamJoin joinThis = new KStreamKStreamJoin(otherWindowStore.name(), windows.beforeMs, windows.afterMs, joiner, this.leftOuter);
            KStreamKStreamJoin joinOther = new KStreamKStreamJoin(thisWindowStore.name(), windows.afterMs, windows.beforeMs, AbstractStream.reverseJoiner(joiner), this.rightOuter);
            KStreamPassThrough joinMerge = new KStreamPassThrough();
            StreamStreamJoinNode.StreamStreamJoinNodeBuilder joinBuilder = StreamStreamJoinNode.streamStreamJoinNodeBuilder();
            ProcessorParameters joinThisProcessorParams = new ProcessorParameters(joinThis, joinThisName);
            ProcessorParameters joinOtherProcessorParams = new ProcessorParameters(joinOther, joinOtherName);
            ProcessorParameters joinMergeProcessorParams = new ProcessorParameters(joinMerge, joinMergeName);
            joinBuilder.withJoinMergeProcessorParameters(joinMergeProcessorParams).withJoinThisProcessorParameters(joinThisProcessorParams).withJoinOtherProcessorParameters(joinOtherProcessorParams).withThisWindowStoreBuilder(thisWindowStore).withOtherWindowStoreBuilder(otherWindowStore).withThisWindowedStreamProcessorParameters(thisWindowStreamProcessorParams).withOtherWindowedStreamProcessorParameters(otherWindowStreamProcessorParams).withValueJoiner(joiner).withNodeName(joinMergeName);
            StreamStreamJoinNode joinGraphNode = joinBuilder.build();
            KStreamImpl.this.builder.addGraphNode(Arrays.asList(thisStreamsGraphNode, otherStreamsGraphNode), joinGraphNode);
            HashSet<String> allSourceNodes = new HashSet<String>(((KStreamImpl)lhs).sourceNodes);
            allSourceNodes.addAll(((KStreamImpl)other).sourceNodes);
            return new KStreamImpl(joinMergeName, joined.keySerde(), null, allSourceNodes, false, joinGraphNode, KStreamImpl.this.builder);
        }
    }
}

