/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct.repackaged.runners.core;

import org.apache.beam.runners.direct.repackaged.runners.core.MergingStateAccessor;
import org.apache.beam.runners.direct.repackaged.runners.core.ReduceFn;
import org.apache.beam.runners.direct.repackaged.runners.core.StateAccessor;
import org.apache.beam.runners.direct.repackaged.runners.core.StateMerging;
import org.apache.beam.runners.direct.repackaged.runners.core.StateTag;
import org.apache.beam.runners.direct.repackaged.runners.core.StateTags;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.GroupingState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.AppliedCombineFn;

public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends BoundedWindow>
extends ReduceFn<K, InputT, OutputT, W> {
    private static final String BUFFER_NAME = "buf";
    private StateTag<? extends GroupingState<InputT, OutputT>> bufferTag;

    public static <K, T, W extends BoundedWindow> SystemReduceFn<K, T, Iterable<T>, Iterable<T>, W> buffering(Coder<T> inputCoder) {
        final StateTag<BagState<T>> bufferTag = StateTags.makeSystemTagInternal(StateTags.bag(BUFFER_NAME, inputCoder));
        return new SystemReduceFn<K, T, Iterable<T>, Iterable<T>, W>(bufferTag){

            @Override
            public void prefetchOnMerge(MergingStateAccessor<K, W> state) throws Exception {
                StateMerging.prefetchBags(state, bufferTag);
            }

            @Override
            public void onMerge(ReduceFn.OnMergeContext c) throws Exception {
                StateMerging.mergeBags(c.state(), bufferTag);
            }
        };
    }

    public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> SystemReduceFn<K, InputT, AccumT, OutputT, W> combining(Coder<K> keyCoder, AppliedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
        final StateTag bufferTag = combineFn.getFn() instanceof CombineWithContext.CombineFnWithContext ? StateTags.makeSystemTagInternal(StateTags.combiningValueWithContext(BUFFER_NAME, combineFn.getAccumulatorCoder(), (CombineWithContext.CombineFnWithContext)combineFn.getFn())) : StateTags.makeSystemTagInternal(StateTags.combiningValue(BUFFER_NAME, combineFn.getAccumulatorCoder(), (Combine.CombineFn)combineFn.getFn()));
        return new SystemReduceFn<K, InputT, AccumT, OutputT, W>(bufferTag){

            @Override
            public void prefetchOnMerge(MergingStateAccessor<K, W> state) throws Exception {
                StateMerging.prefetchCombiningValues(state, bufferTag);
            }

            @Override
            public void onMerge(ReduceFn.OnMergeContext c) throws Exception {
                StateMerging.mergeCombiningValues(c.state(), bufferTag);
            }
        };
    }

    public SystemReduceFn(StateTag<? extends GroupingState<InputT, OutputT>> bufferTag) {
        this.bufferTag = bufferTag;
    }

    @Override
    public void processValue(ReduceFn.ProcessValueContext c) throws Exception {
        c.state().access(this.bufferTag).add(c.value());
    }

    @Override
    public void prefetchOnTrigger(StateAccessor<K> state) {
        state.access(this.bufferTag).readLater();
    }

    @Override
    public void onTrigger(ReduceFn.OnTriggerContext c) throws Exception {
        c.output(c.state().access(this.bufferTag).read());
    }

    @Override
    public void clearState(ReduceFn.Context c) throws Exception {
        c.state().access(this.bufferTag).clear();
    }

    @Override
    public ReadableState<Boolean> isEmpty(StateAccessor<K> state) {
        return state.access(this.bufferTag).isEmpty();
    }
}

