/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct.repackaged.runners.core;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import org.apache.beam.runners.direct.repackaged.runners.core.MergingStateAccessor;
import org.apache.beam.runners.direct.repackaged.runners.core.StateTag;
import org.apache.beam.runners.direct.repackaged.runners.core.java.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.GroupingState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

public class StateMerging {
    public static <K, StateT extends State, W extends BoundedWindow> void clear(MergingStateAccessor<K, W> context, StateTag<StateT> address) {
        for (State state : context.accessInEachMergingWindow(address).values()) {
            state.clear();
        }
    }

    public static <K, T, W extends BoundedWindow> void prefetchBags(MergingStateAccessor<K, W> context, StateTag<BagState<T>> address) {
        Map<W, BagState<T>> map = context.accessInEachMergingWindow(address);
        if (map.isEmpty()) {
            return;
        }
        BagState<T> result = context.access(address);
        for (BagState<T> source : map.values()) {
            if (source.equals(result)) continue;
            StateMerging.prefetchRead(source);
        }
    }

    public static <K, T, W extends BoundedWindow> void mergeBags(MergingStateAccessor<K, W> context, StateTag<BagState<T>> address) {
        StateMerging.mergeBags(context.accessInEachMergingWindow(address).values(), context.access(address));
    }

    public static <T, W extends BoundedWindow> void mergeBags(Collection<BagState<T>> sources, BagState<T> result) {
        if (sources.isEmpty()) {
            return;
        }
        ArrayList<BagState<T>> futures = new ArrayList<BagState<T>>(sources.size());
        for (BagState<T> bagState : sources) {
            if (bagState.equals(result)) continue;
            StateMerging.prefetchRead(bagState);
            futures.add(bagState);
        }
        if (futures.isEmpty()) {
            return;
        }
        for (ReadableState readableState : futures) {
            for (Object element : (Iterable)readableState.read()) {
                result.add(element);
            }
        }
        for (BagState bagState : sources) {
            if (bagState.equals(result)) continue;
            bagState.clear();
        }
    }

    public static <K, T, W extends BoundedWindow> void mergeSets(MergingStateAccessor<K, W> context, StateTag<SetState<T>> address) {
        StateMerging.mergeSets(context.accessInEachMergingWindow(address).values(), context.access(address));
    }

    public static <T, W extends BoundedWindow> void mergeSets(Collection<SetState<T>> sources, SetState<T> result) {
        if (sources.isEmpty()) {
            return;
        }
        ArrayList<SetState<T>> futures = new ArrayList<SetState<T>>(sources.size());
        for (SetState<T> setState : sources) {
            if (setState.equals(result)) continue;
            StateMerging.prefetchRead(setState);
            futures.add(setState);
        }
        if (futures.isEmpty()) {
            return;
        }
        for (ReadableState readableState : futures) {
            for (Object element : (Iterable)readableState.read()) {
                result.add(element);
            }
        }
        for (SetState setState : sources) {
            if (setState.equals(result)) continue;
            setState.clear();
        }
    }

    public static <K, StateT extends GroupingState<?, ?>, W extends BoundedWindow> void prefetchCombiningValues(MergingStateAccessor<K, W> context, StateTag<StateT> address) {
        for (GroupingState state : context.accessInEachMergingWindow(address).values()) {
            StateMerging.prefetchRead(state);
        }
    }

    public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> void mergeCombiningValues(MergingStateAccessor<K, W> context, StateTag<CombiningState<InputT, AccumT, OutputT>> address) {
        StateMerging.mergeCombiningValues(context.accessInEachMergingWindow(address).values(), context.access(address));
    }

    public static <InputT, AccumT, OutputT, W extends BoundedWindow> void mergeCombiningValues(Collection<CombiningState<InputT, AccumT, OutputT>> sources, CombiningState<InputT, AccumT, OutputT> result) {
        if (sources.isEmpty()) {
            return;
        }
        if (sources.size() == 1 && sources.contains(result)) {
            return;
        }
        ArrayList futures = new ArrayList(sources.size());
        for (CombiningState<InputT, AccumT, OutputT> combiningState : sources) {
            StateMerging.prefetchRead(combiningState);
        }
        ArrayList<Object> accumulators = new ArrayList<Object>(futures.size());
        for (CombiningState<InputT, AccumT, OutputT> combiningState : sources) {
            accumulators.add(combiningState.getAccum());
        }
        Object object = result.mergeAccumulators(accumulators);
        for (CombiningState<InputT, AccumT, OutputT> source : sources) {
            source.clear();
        }
        result.addAccum(object);
    }

    public static <K, W extends BoundedWindow> void prefetchWatermarks(MergingStateAccessor<K, W> context, StateTag<WatermarkHoldState> address) {
        Map<W, WatermarkHoldState> map = context.accessInEachMergingWindow(address);
        WatermarkHoldState result = context.access(address);
        if (map.isEmpty()) {
            return;
        }
        if (map.size() == 1 && map.values().contains(result) && result.getTimestampCombiner().dependsOnlyOnEarliestTimestamp()) {
            return;
        }
        if (result.getTimestampCombiner().dependsOnlyOnWindow()) {
            return;
        }
        for (WatermarkHoldState source : map.values()) {
            StateMerging.prefetchRead(source);
        }
    }

    private static void prefetchRead(ReadableState<?> source) {
        source.readLater();
    }

    public static <K, W extends BoundedWindow> void mergeWatermarks(MergingStateAccessor<K, W> context, StateTag<WatermarkHoldState> address, W mergeResult) {
        StateMerging.mergeWatermarks(context.accessInEachMergingWindow(address).values(), context.access(address), mergeResult);
    }

    public static <W extends BoundedWindow> void mergeWatermarks(Collection<WatermarkHoldState> sources, WatermarkHoldState result, W resultWindow) {
        if (sources.isEmpty()) {
            return;
        }
        if (sources.size() == 1 && sources.contains(result) && result.getTimestampCombiner().dependsOnlyOnEarliestTimestamp()) {
            return;
        }
        if (result.getTimestampCombiner().dependsOnlyOnWindow()) {
            for (WatermarkHoldState source : sources) {
                source.clear();
            }
            Instant hold = result.getTimestampCombiner().assign(resultWindow, BoundedWindow.TIMESTAMP_MIN_VALUE);
            Preconditions.checkState(hold.isAfter((ReadableInstant)BoundedWindow.TIMESTAMP_MIN_VALUE));
            result.add((Object)hold);
        } else {
            ArrayList<WatermarkHoldState> futures = new ArrayList<WatermarkHoldState>(sources.size());
            for (WatermarkHoldState source : sources) {
                futures.add(source);
            }
            ArrayList<Instant> outputTimesToMerge = new ArrayList<Instant>(sources.size());
            for (ReadableState readableState : futures) {
                Instant sourceOutputTime = (Instant)readableState.read();
                if (sourceOutputTime == null) continue;
                outputTimesToMerge.add(sourceOutputTime);
            }
            for (WatermarkHoldState watermarkHoldState : sources) {
                watermarkHoldState.clear();
            }
            if (!outputTimesToMerge.isEmpty()) {
                result.add((Object)result.getTimestampCombiner().merge(resultWindow, outputTimesToMerge));
            }
        }
    }
}

