/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.transforms;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.transforms.Contextful;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Requirements;
import org.apache.beam.sdk.transforms.Sample;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Never;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;

@Experimental
public class Wait {
    public static <T> OnSignal<T> on(PCollection<?> ... signals) {
        return Wait.on(Arrays.asList(signals));
    }

    public static <T> OnSignal<T> on(List<PCollection<?>> signals) {
        return new OnSignal(signals);
    }

    private static class CollectWindowsFn<T>
    extends DoFn<T, Void> {
        @Nullable
        private Set<BoundedWindow> windows;

        private CollectWindowsFn() {
        }

        @DoFn.StartBundle
        public void startBundle() {
            this.windows = Sets.newHashSetWithExpectedSize(1);
        }

        @DoFn.ProcessElement
        public void process(DoFn.ProcessContext c, BoundedWindow w) {
            this.windows.add(w);
        }

        @DoFn.FinishBundle
        public void finishBundle(DoFn.FinishBundleContext c) {
            for (BoundedWindow w : this.windows) {
                c.output(null, w.maxTimestamp(), w);
            }
        }
    }

    private static class ToWaitView
    extends PTransform<PCollection<?>, PCollectionView<?>> {
        private ToWaitView() {
        }

        @Override
        public PCollectionView<?> expand(PCollection<?> input) {
            return this.expandTyped(input);
        }

        private <SignalT> PCollectionView<?> expandTyped(PCollection<SignalT> input) {
            return (PCollectionView)((PCollection)((PCollection)input.apply(Window.configure().triggering(Never.ever()).discardingFiredPanes())).apply(ParDo.of(new CollectWindowsFn()))).apply(Sample.any(1L)).apply(View.asList());
        }
    }

    public static class OnSignal<T>
    extends PTransform<PCollection<T>, PCollection<T>> {
        private final transient List<PCollection<?>> signals;

        private OnSignal(List<PCollection<?>> signals) {
            this.signals = signals;
        }

        @Override
        public PCollection<T> expand(PCollection<T> input) {
            ArrayList<PCollectionView<?>> views = Lists.newArrayList();
            for (int i = 0; i < this.signals.size(); ++i) {
                views.add((PCollectionView)this.signals.get(i).apply("To wait view " + i, new ToWaitView()));
            }
            return (PCollection)input.apply("Wait", MapElements.into(input.getCoder().getEncodedTypeDescriptor()).via(Contextful.fn((t, c) -> t, Requirements.requiresSideInputs(views))));
        }
    }
}

