/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.beam.fn.harness.MapFnRunners;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.BeamUrns;
import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;

public abstract class WindowMergingFnRunner<T, W extends BoundedWindow> {
    static final String URN = BeamUrns.getUrn(RunnerApi.StandardPTransforms.Primitives.MERGE_WINDOWS);

    static <T, W extends BoundedWindow> ThrowingFunction<KV<T, Iterable<W>>, KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>>> createMapFunctionForPTransform(String ptransformId, RunnerApi.PTransform ptransform) throws IOException {
        RunnerApi.SdkFunctionSpec payload = RunnerApi.SdkFunctionSpec.parseFrom(ptransform.getSpec().getPayload());
        WindowFn<?, ?> windowFn = WindowingStrategyTranslation.windowFnFromProto(payload);
        return WindowMergingFnRunner.create(windowFn)::mergeWindows;
    }

    static <T, W extends BoundedWindow> WindowMergingFnRunner<T, W> create(WindowFn<?, W> windowFn) {
        if (windowFn.isNonMerging()) {
            return new NonMergingWindowFnRunner();
        }
        return new MergingViaWindowFnRunner(windowFn);
    }

    abstract KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>> mergeWindows(KV<T, Iterable<W>> var1) throws Exception;

    private static class MergingViaWindowFnRunner<T, W extends BoundedWindow>
    extends WindowMergingFnRunner<T, W> {
        private final WindowFn<T, W> windowFn;
        private final WindowFn.MergeContext mergeContext;
        private Collection<W> currentWindows;
        private List<KV<W, Collection<W>>> mergedWindows;

        private MergingViaWindowFnRunner(WindowFn<T, W> windowFn) {
            this.windowFn = windowFn;
            this.mergedWindows = new ArrayList<KV<W, Collection<W>>>();
            this.currentWindows = new ArrayList<W>();
            this.mergeContext = new WindowFn.MergeContext(windowFn){

                @Override
                public Collection<W> windows() {
                    return currentWindows;
                }

                @Override
                public void merge(Collection<W> toBeMerged, W mergeResult) throws Exception {
                    mergedWindows.add(KV.of(mergeResult, toBeMerged));
                }
            };
        }

        @Override
        KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>> mergeWindows(KV<T, Iterable<W>> windowsToMerge) throws Exception {
            this.currentWindows = Sets.newHashSet(windowsToMerge.getValue());
            this.windowFn.mergeWindows(this.mergeContext);
            for (KV<W, Collection<W>> mergedWindow : this.mergedWindows) {
                this.currentWindows.removeAll(mergedWindow.getValue());
            }
            return KV.of(windowsToMerge.getKey(), KV.of(this.currentWindows, this.mergedWindows));
        }
    }

    private static class NonMergingWindowFnRunner<T, W extends BoundedWindow>
    extends WindowMergingFnRunner<T, W> {
        private NonMergingWindowFnRunner() {
        }

        @Override
        KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>> mergeWindows(KV<T, Iterable<W>> windowsToMerge) {
            return KV.of(windowsToMerge.getKey(), KV.of(windowsToMerge.getValue(), Collections.emptyList()));
        }
    }

    @AutoService(value=PTransformRunnerFactory.Registrar.class)
    public static class Registrar
    implements PTransformRunnerFactory.Registrar {
        @Override
        public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
            return ImmutableMap.of(URN, MapFnRunners.forValueMapFnFactory(WindowMergingFnRunner::createMapFunctionForPTransform));
        }
    }
}

