package org.apache.kafka.streams.kstream.internals;

import java.util.Objects;
import java.util.Set;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.SessionWindowedKStream;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.class */
public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implements SessionWindowedKStream<K, V> {
    private final SessionWindows windows;
    private final Serde<K> keySerde;
    private final Serde<V> valSerde;
    private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
    private final Merger<K, Long> countMerger;
    private final Initializer<V> reduceInitializer;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SessionWindowedKStreamImpl(SessionWindows sessionWindows, InternalStreamsBuilder internalStreamsBuilder, Set<String> set, String str, Serde<K> serde, Serde<V> serde2, GroupedStreamAggregateBuilder<K, V> groupedStreamAggregateBuilder) {
        super(internalStreamsBuilder, str, set);
        this.countMerger = new Merger<K, Long>() { // from class: org.apache.kafka.streams.kstream.internals.SessionWindowedKStreamImpl.1
            /* renamed from: apply, reason: avoid collision after fix types in other method */
            public Long apply2(K k, Long l, Long l2) {
                return Long.valueOf(l.longValue() + l2.longValue());
            }

            @Override // org.apache.kafka.streams.kstream.Merger
            public /* bridge */ /* synthetic */ Long apply(Object obj, Long l, Long l2) {
                return apply2((AnonymousClass1) obj, l, l2);
            }
        };
        this.reduceInitializer = new Initializer<V>() { // from class: org.apache.kafka.streams.kstream.internals.SessionWindowedKStreamImpl.2
            @Override // org.apache.kafka.streams.kstream.Initializer
            public V apply() {
                return null;
            }
        };
        this.windows = sessionWindows;
        this.keySerde = serde;
        this.valSerde = serde2;
        this.aggregateBuilder = groupedStreamAggregateBuilder;
    }

    @Override // org.apache.kafka.streams.kstream.SessionWindowedKStream
    public KTable<Windowed<K>, Long> count() {
        return (KTable<Windowed<K>, Long>) doAggregate(this.aggregateBuilder.countInitializer, this.aggregateBuilder.countAggregator, this.countMerger, Serdes.Long());
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.kafka.streams.kstream.SessionWindowedKStream
    public KTable<Windowed<K>, Long> count(Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(materialized, "materialized can't be null");
        if (new MaterializedInternal(materialized, this.builder, "KSTREAM-AGGREGATE-").valueSerde() == null) {
            materialized.withValueSerde(Serdes.Long());
        }
        return (KTable<Windowed<K>, Long>) aggregate(this.aggregateBuilder.countInitializer, this.aggregateBuilder.countAggregator, this.countMerger, materialized);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.kafka.streams.kstream.SessionWindowedKStream
    public <T> KTable<Windowed<K>, T> aggregate(Initializer<T> initializer, Aggregator<? super K, ? super V, T> aggregator, Merger<? super K, T> merger) {
        Objects.requireNonNull(initializer, "initializer can't be null");
        Objects.requireNonNull(aggregator, "aggregator can't be null");
        Objects.requireNonNull(merger, "sessionMerger can't be null");
        return (KTable<Windowed<K>, T>) doAggregate(initializer, aggregator, merger, this.valSerde);
    }

    @Override // org.apache.kafka.streams.kstream.SessionWindowedKStream
    public <VR> KTable<Windowed<K>, VR> aggregate(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> aggregator, Merger<? super K, VR> merger, Materialized<K, VR, SessionStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(initializer, "initializer can't be null");
        Objects.requireNonNull(aggregator, "aggregator can't be null");
        Objects.requireNonNull(merger, "sessionMerger can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");
        MaterializedInternal<K, VR, SessionStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized, this.builder, "KSTREAM-AGGREGATE-");
        if (materializedInternal.keySerde() == null) {
            materializedInternal.withKeySerde(this.keySerde);
        }
        return (KTable<Windowed<K>, VR>) this.aggregateBuilder.build(new KStreamSessionWindowAggregate(this.windows, materializedInternal.storeName(), initializer, aggregator, merger), "KSTREAM-AGGREGATE-", materialize(materializedInternal), true);
    }

    @Override // org.apache.kafka.streams.kstream.SessionWindowedKStream
    public KTable<Windowed<K>, V> reduce(Reducer<V> reducer) {
        Objects.requireNonNull(reducer, "reducer can't be null");
        return (KTable<Windowed<K>, V>) doAggregate(this.reduceInitializer, aggregatorForReducer(reducer), mergerForAggregator(aggregatorForReducer(reducer)), this.valSerde);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.kafka.streams.kstream.SessionWindowedKStream
    public KTable<Windowed<K>, V> reduce(Reducer<V> reducer, Materialized<K, V, SessionStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(reducer, "reducer can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");
        Aggregator aggregatorForReducer = aggregatorForReducer(reducer);
        return (KTable<Windowed<K>, V>) aggregate(this.reduceInitializer, aggregatorForReducer, mergerForAggregator(aggregatorForReducer), materialized);
    }

    private <VR> StoreBuilder<SessionStore<K, VR>> materialize(MaterializedInternal<K, VR, SessionStore<Bytes, byte[]>> materializedInternal) {
        SessionBytesStoreSupplier sessionBytesStoreSupplier = (SessionBytesStoreSupplier) materializedInternal.storeSupplier();
        if (sessionBytesStoreSupplier == null) {
            sessionBytesStoreSupplier = Stores.persistentSessionStore(materializedInternal.storeName(), this.windows.maintainMs());
        }
        StoreBuilder<SessionStore<K, VR>> sessionStoreBuilder = Stores.sessionStoreBuilder(sessionBytesStoreSupplier, materializedInternal.keySerde(), materializedInternal.valueSerde());
        if (materializedInternal.loggingEnabled()) {
            sessionStoreBuilder.withLoggingEnabled(materializedInternal.logConfig());
        } else {
            sessionStoreBuilder.withLoggingDisabled();
        }
        if (materializedInternal.cachingEnabled()) {
            sessionStoreBuilder.withCachingEnabled();
        }
        return sessionStoreBuilder;
    }

    private Merger<K, V> mergerForAggregator(final Aggregator<K, V, V> aggregator) {
        return new Merger<K, V>() { // from class: org.apache.kafka.streams.kstream.internals.SessionWindowedKStreamImpl.3
            @Override // org.apache.kafka.streams.kstream.Merger
            public V apply(K k, V v, V v2) {
                return (V) aggregator.apply(k, v2, v);
            }
        };
    }

    private Aggregator<K, V, V> aggregatorForReducer(final Reducer<V> reducer) {
        return new Aggregator<K, V, V>() { // from class: org.apache.kafka.streams.kstream.internals.SessionWindowedKStreamImpl.4
            @Override // org.apache.kafka.streams.kstream.Aggregator
            public V apply(K k, V v, V v2) {
                return v2 == null ? v : (V) reducer.apply(v2, v);
            }
        };
    }

    private <VR> StoreBuilder<SessionStore<K, VR>> storeBuilder(String str, Serde<VR> serde) {
        return Stores.sessionStoreBuilder(Stores.persistentSessionStore(str, this.windows.maintainMs()), this.keySerde, serde).withCachingEnabled();
    }

    private <VR> KTable<Windowed<K>, VR> doAggregate(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> aggregator, Merger<? super K, VR> merger, Serde<VR> serde) {
        String newStoreName = this.builder.newStoreName("KSTREAM-AGGREGATE-");
        return (KTable<Windowed<K>, VR>) this.aggregateBuilder.build(new KStreamSessionWindowAggregate(this.windows, newStoreName, initializer, aggregator, merger), "KSTREAM-AGGREGATE-", storeBuilder(newStoreName, serde), false);
    }
}
