/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.AbstractStream;
import org.apache.kafka.streams.kstream.internals.KStreamAggProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.KStreamAggregate;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.kstream.internals.KStreamReduce;
import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate;
import org.apache.kafka.streams.kstream.internals.KStreamWindowReduce;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.Stores;

public class KGroupedStreamImpl<K, V>
extends AbstractStream<K>
implements KGroupedStream<K, V> {
    private static final String REDUCE_NAME = "KSTREAM-REDUCE-";
    private static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
    private final Serde<K> keySerde;
    private final Serde<V> valSerde;
    private final boolean repartitionRequired;

    public KGroupedStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes, Serde<K> keySerde, Serde<V> valSerde, boolean repartitionRequired) {
        super(topology, name, sourceNodes);
        this.keySerde = keySerde;
        this.valSerde = valSerde;
        this.repartitionRequired = repartitionRequired;
    }

    @Override
    public KTable<K, V> reduce(Reducer<V> reducer, String storeName) {
        Objects.requireNonNull(reducer, "reducer can't be null");
        Objects.requireNonNull(storeName, "storeName can't be null");
        return this.doAggregate(new KStreamReduce(storeName, reducer), REDUCE_NAME, this.keyValueStore(this.valSerde, storeName));
    }

    @Override
    public <W extends Window> KTable<Windowed<K>, V> reduce(Reducer<V> reducer, Windows<W> windows, String storeName) {
        Objects.requireNonNull(reducer, "reducer can't be null");
        Objects.requireNonNull(windows, "windows can't be null");
        Objects.requireNonNull(storeName, "storeName can't be null");
        return this.doAggregate(new KStreamWindowReduce(windows, storeName, reducer), REDUCE_NAME, this.windowedStore(this.valSerde, windows, storeName));
    }

    @Override
    public <T> KTable<K, T> aggregate(Initializer<T> initializer, Aggregator<K, V, T> aggregator, Serde<T> aggValueSerde, String storeName) {
        Objects.requireNonNull(initializer, "initializer can't be null");
        Objects.requireNonNull(aggregator, "aggregator can't be null");
        Objects.requireNonNull(storeName, "storeName can't be null");
        return this.doAggregate(new KStreamAggregate<K, V, T>(storeName, initializer, aggregator), AGGREGATE_NAME, this.keyValueStore(aggValueSerde, storeName));
    }

    @Override
    public <W extends Window, T> KTable<Windowed<K>, T> aggregate(Initializer<T> initializer, Aggregator<K, V, T> aggregator, Windows<W> windows, Serde<T> aggValueSerde, String storeName) {
        Objects.requireNonNull(initializer, "initializer can't be null");
        Objects.requireNonNull(aggregator, "aggregator can't be null");
        Objects.requireNonNull(windows, "windows can't be null");
        Objects.requireNonNull(storeName, "storeName can't be null");
        return this.doAggregate(new KStreamWindowAggregate<K, V, T, W>(windows, storeName, initializer, aggregator), AGGREGATE_NAME, this.windowedStore(aggValueSerde, windows, storeName));
    }

    @Override
    public KTable<K, Long> count(String storeName) {
        return this.aggregate(new Initializer<Long>(){

            @Override
            public Long apply() {
                return 0L;
            }
        }, new Aggregator<K, V, Long>(){

            @Override
            public Long apply(K aggKey, V value, Long aggregate) {
                return aggregate + 1L;
            }
        }, Serdes.Long(), storeName);
    }

    @Override
    public <W extends Window> KTable<Windowed<K>, Long> count(Windows<W> windows, String storeName) {
        return this.aggregate(new Initializer<Long>(){

            @Override
            public Long apply() {
                return 0L;
            }
        }, new Aggregator<K, V, Long>(){

            @Override
            public Long apply(K aggKey, V value, Long aggregate) {
                return aggregate + 1L;
            }
        }, windows, Serdes.Long(), storeName);
    }

    private <T> StateStoreSupplier keyValueStore(Serde<T> aggValueSerde, String name) {
        return this.storeFactory(aggValueSerde, name).build();
    }

    private <W extends Window, T> StateStoreSupplier windowedStore(Serde<T> aggValSerde, Windows<W> windows, String storeName) {
        return this.storeFactory(aggValSerde, storeName).windowed(windows.size(), windows.maintainMs(), windows.segments, false).build();
    }

    private <T> Stores.PersistentKeyValueFactory<K, T> storeFactory(Serde<T> aggValueSerde, String storeName) {
        return Stores.create(storeName).withKeys(this.keySerde).withValues(aggValueSerde).persistent().enableCaching();
    }

    private <T> KTable<K, T> doAggregate(KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier, String functionName, StateStoreSupplier storeSupplier) {
        String aggFunctionName = this.topology.newName(functionName);
        String sourceName = this.repartitionIfRequired(storeSupplier.name());
        this.topology.addProcessor(aggFunctionName, aggregateSupplier, sourceName);
        this.topology.addStateStore(storeSupplier, aggFunctionName);
        return new KTableImpl(this.topology, aggFunctionName, aggregateSupplier, sourceName.equals(this.name) ? this.sourceNodes : Collections.singleton(sourceName), storeSupplier.name());
    }

    private String repartitionIfRequired(String storeName) {
        if (!this.repartitionRequired) {
            return this.name;
        }
        return KStreamImpl.createReparitionedSource(this, this.keySerde, this.valSerde, storeName);
    }
}

