/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.internals.AbstractStream;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.kstream.internals.KStreamJoinWindow;
import org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin;
import org.apache.kafka.streams.kstream.internals.NamedInternal;
import org.apache.kafka.streams.kstream.internals.PassThrough;
import org.apache.kafka.streams.kstream.internals.StreamJoinedInternal;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StreamStreamJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;

class KStreamImplJoin {
    private final InternalStreamsBuilder builder;
    private final boolean leftOuter;
    private final boolean rightOuter;

    KStreamImplJoin(InternalStreamsBuilder builder, boolean leftOuter, boolean rightOuter) {
        this.builder = builder;
        this.leftOuter = leftOuter;
        this.rightOuter = rightOuter;
    }

    public <K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> lhs, KStream<K1, V2> other, ValueJoiner<? super V1, ? super V2, ? extends R> joiner, JoinWindows windows, StreamJoined<K1, V1, V2> streamJoined) {
        StoreBuilder otherWindowStore;
        StoreBuilder thisWindowStore;
        StreamJoinedInternal<K1, V1, V2> streamJoinedInternal = new StreamJoinedInternal<K1, V1, V2>(streamJoined);
        NamedInternal renamed = new NamedInternal(streamJoinedInternal.name());
        String joinThisSuffix = this.rightOuter ? "-outer-this-join" : "-this-join";
        String joinOtherSuffix = this.leftOuter ? "-outer-other-join" : "-other-join";
        String thisWindowStreamProcessorName = renamed.suffixWithOrElseGet("-this-windowed", this.builder, "KSTREAM-WINDOWED-");
        String otherWindowStreamProcessorName = renamed.suffixWithOrElseGet("-other-windowed", this.builder, "KSTREAM-WINDOWED-");
        String joinThisGeneratedName = this.rightOuter ? this.builder.newProcessorName("KSTREAM-OUTERTHIS-") : this.builder.newProcessorName("KSTREAM-JOINTHIS-");
        String joinOtherGeneratedName = this.leftOuter ? this.builder.newProcessorName("KSTREAM-OUTEROTHER-") : this.builder.newProcessorName("KSTREAM-JOINOTHER-");
        String joinThisName = renamed.suffixWithOrElseGet(joinThisSuffix, joinThisGeneratedName);
        String joinOtherName = renamed.suffixWithOrElseGet(joinOtherSuffix, joinOtherGeneratedName);
        String joinMergeName = renamed.suffixWithOrElseGet("-merge", this.builder, "KSTREAM-MERGE-");
        StreamsGraphNode thisStreamsGraphNode = ((AbstractStream)((Object)lhs)).streamsGraphNode;
        StreamsGraphNode otherStreamsGraphNode = ((AbstractStream)((Object)other)).streamsGraphNode;
        String userProvidedBaseStoreName = streamJoinedInternal.storeName();
        WindowBytesStoreSupplier thisStoreSupplier = streamJoinedInternal.thisStoreSupplier();
        WindowBytesStoreSupplier otherStoreSupplier = streamJoinedInternal.otherStoreSupplier();
        this.assertUniqueStoreNames(thisStoreSupplier, otherStoreSupplier);
        if (thisStoreSupplier == null) {
            String thisJoinStoreName = userProvidedBaseStoreName == null ? joinThisGeneratedName : userProvidedBaseStoreName + joinThisSuffix;
            thisWindowStore = KStreamImplJoin.joinWindowStoreBuilder(thisJoinStoreName, windows, streamJoinedInternal.keySerde(), streamJoinedInternal.valueSerde());
        } else {
            this.assertWindowSettings(thisStoreSupplier, windows);
            thisWindowStore = KStreamImplJoin.joinWindowStoreBuilderFromSupplier(thisStoreSupplier, streamJoinedInternal.keySerde(), streamJoinedInternal.valueSerde());
        }
        if (otherStoreSupplier == null) {
            String otherJoinStoreName = userProvidedBaseStoreName == null ? joinOtherGeneratedName : userProvidedBaseStoreName + joinOtherSuffix;
            otherWindowStore = KStreamImplJoin.joinWindowStoreBuilder(otherJoinStoreName, windows, streamJoinedInternal.keySerde(), streamJoinedInternal.otherValueSerde());
        } else {
            this.assertWindowSettings(otherStoreSupplier, windows);
            otherWindowStore = KStreamImplJoin.joinWindowStoreBuilderFromSupplier(otherStoreSupplier, streamJoinedInternal.keySerde(), streamJoinedInternal.otherValueSerde());
        }
        KStreamJoinWindow thisWindowedStream = new KStreamJoinWindow(thisWindowStore.name());
        ProcessorParameters thisWindowStreamProcessorParams = new ProcessorParameters(thisWindowedStream, thisWindowStreamProcessorName);
        ProcessorGraphNode thisWindowedStreamsNode = new ProcessorGraphNode(thisWindowStreamProcessorName, thisWindowStreamProcessorParams);
        this.builder.addGraphNode(thisStreamsGraphNode, thisWindowedStreamsNode);
        KStreamJoinWindow otherWindowedStream = new KStreamJoinWindow(otherWindowStore.name());
        ProcessorParameters otherWindowStreamProcessorParams = new ProcessorParameters(otherWindowedStream, otherWindowStreamProcessorName);
        ProcessorGraphNode otherWindowedStreamsNode = new ProcessorGraphNode(otherWindowStreamProcessorName, otherWindowStreamProcessorParams);
        this.builder.addGraphNode(otherStreamsGraphNode, otherWindowedStreamsNode);
        KStreamKStreamJoin joinThis = new KStreamKStreamJoin(otherWindowStore.name(), windows.beforeMs, windows.afterMs, joiner, this.leftOuter);
        KStreamKStreamJoin joinOther = new KStreamKStreamJoin(thisWindowStore.name(), windows.afterMs, windows.beforeMs, AbstractStream.reverseJoiner(joiner), this.rightOuter);
        PassThrough joinMerge = new PassThrough();
        StreamStreamJoinNode.StreamStreamJoinNodeBuilder joinBuilder = StreamStreamJoinNode.streamStreamJoinNodeBuilder();
        ProcessorParameters joinThisProcessorParams = new ProcessorParameters(joinThis, joinThisName);
        ProcessorParameters joinOtherProcessorParams = new ProcessorParameters(joinOther, joinOtherName);
        ProcessorParameters joinMergeProcessorParams = new ProcessorParameters(joinMerge, joinMergeName);
        joinBuilder.withJoinMergeProcessorParameters(joinMergeProcessorParams).withJoinThisProcessorParameters(joinThisProcessorParams).withJoinOtherProcessorParameters(joinOtherProcessorParams).withThisWindowStoreBuilder(thisWindowStore).withOtherWindowStoreBuilder(otherWindowStore).withThisWindowedStreamProcessorParameters(thisWindowStreamProcessorParams).withOtherWindowedStreamProcessorParameters(otherWindowStreamProcessorParams).withValueJoiner(joiner).withNodeName(joinMergeName);
        StreamStreamJoinNode joinGraphNode = joinBuilder.build();
        this.builder.addGraphNode(Arrays.asList(thisStreamsGraphNode, otherStreamsGraphNode), joinGraphNode);
        HashSet<String> allSourceNodes = new HashSet<String>(((KStreamImpl)lhs).subTopologySourceNodes);
        allSourceNodes.addAll(((KStreamImpl)other).subTopologySourceNodes);
        return new KStreamImpl(joinMergeName, streamJoinedInternal.keySerde(), null, allSourceNodes, false, joinGraphNode, this.builder);
    }

    private void assertWindowSettings(WindowBytesStoreSupplier supplier, JoinWindows joinWindows) {
        boolean allMatch;
        if (!supplier.retainDuplicates()) {
            throw new StreamsException("The StoreSupplier must set retainDuplicates=true, found retainDuplicates=false");
        }
        boolean bl = allMatch = supplier.retentionPeriod() == joinWindows.size() + joinWindows.gracePeriodMs() && supplier.windowSize() == joinWindows.size();
        if (!allMatch) {
            throw new StreamsException(String.format("Window settings mismatch. WindowBytesStoreSupplier settings %s must match JoinWindows settings %s", supplier, joinWindows));
        }
    }

    private void assertUniqueStoreNames(WindowBytesStoreSupplier supplier, WindowBytesStoreSupplier otherSupplier) {
        if (supplier != null && otherSupplier != null && supplier.name().equals(otherSupplier.name())) {
            throw new StreamsException("Both StoreSuppliers have the same name.  StoreSuppliers must provide unique names");
        }
    }

    private static <K, V> StoreBuilder<WindowStore<K, V>> joinWindowStoreBuilder(String storeName, JoinWindows windows, Serde<K> keySerde, Serde<V> valueSerde) {
        return Stores.windowStoreBuilder(Stores.persistentWindowStore(storeName + "-store", Duration.ofMillis(windows.size() + windows.gracePeriodMs()), Duration.ofMillis(windows.size()), true), keySerde, valueSerde);
    }

    private static <K, V> StoreBuilder<WindowStore<K, V>> joinWindowStoreBuilderFromSupplier(WindowBytesStoreSupplier storeSupplier, Serde<K> keySerde, Serde<V> valueSerde) {
        return Stores.windowStoreBuilder(storeSupplier, keySerde, valueSerde);
    }
}

