/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Collections;
import java.util.Set;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.KStreamAggProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.state.StoreBuilder;

class GroupedStreamAggregateBuilder<K, V> {
    private final InternalStreamsBuilder builder;
    private final Serde<K> keySerde;
    private final Serde<V> valueSerde;
    private final boolean repartitionRequired;
    private final Set<String> sourceNodes;
    private final String name;
    final Initializer<Long> countInitializer = new Initializer<Long>(){

        @Override
        public Long apply() {
            return 0L;
        }
    };
    final Aggregator<K, V, Long> countAggregator = new Aggregator<K, V, Long>(){

        @Override
        public Long apply(K aggKey, V value, Long aggregate) {
            return aggregate + 1L;
        }
    };

    GroupedStreamAggregateBuilder(InternalStreamsBuilder builder, Serde<K> keySerde, Serde<V> valueSerde, boolean repartitionRequired, Set<String> sourceNodes, String name) {
        this.builder = builder;
        this.keySerde = keySerde;
        this.valueSerde = valueSerde;
        this.repartitionRequired = repartitionRequired;
        this.sourceNodes = sourceNodes;
        this.name = name;
    }

    <T> KTable<K, T> build(KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier, String functionName, StoreBuilder storeBuilder, boolean isQueryable) {
        String aggFunctionName = this.builder.newProcessorName(functionName);
        String sourceName = this.repartitionIfRequired(storeBuilder.name());
        this.builder.internalTopologyBuilder.addProcessor(aggFunctionName, aggregateSupplier, sourceName);
        this.builder.internalTopologyBuilder.addStateStore(storeBuilder, aggFunctionName);
        return new KTableImpl(this.builder, aggFunctionName, aggregateSupplier, sourceName.equals(this.name) ? this.sourceNodes : Collections.singleton(sourceName), storeBuilder.name(), isQueryable);
    }

    private String repartitionIfRequired(String queryableStoreName) {
        if (!this.repartitionRequired) {
            return this.name;
        }
        return KStreamImpl.createReparitionedSource(this.builder, this.keySerde, this.valueSerde, queryableStoreName, this.name);
    }
}

