/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.core.construction;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Uninterruptibles;
import org.joda.time.Instant;

public class SplittableParDoNaiveBounded {

    static class NaiveProcessFn<InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT, ?>>
    extends DoFn<KV<InputT, RestrictionT>, OutputT> {
        private final DoFn<InputT, OutputT> fn;
        @Nullable
        private transient DoFnInvoker<InputT, OutputT> invoker;

        NaiveProcessFn(DoFn<InputT, OutputT> fn) {
            this.fn = fn;
        }

        @DoFn.Setup
        public void setup() {
            this.invoker = DoFnInvokers.invokerFor(this.fn);
            this.invoker.invokeSetup();
        }

        @DoFn.StartBundle
        public void startBundle(final DoFn.StartBundleContext c) {
            this.invoker.invokeStartBundle(new DoFn.StartBundleContext(){

                @Override
                public PipelineOptions getPipelineOptions() {
                    return c.getPipelineOptions();
                }
            });
        }

        @DoFn.ProcessElement
        public void process(DoFn.ProcessContext c, BoundedWindow w) {
            Object tracker;
            DoFn.ProcessContinuation continuation;
            Object element = ((KV)c.element()).getKey();
            Object restriction = ((KV)c.element()).getValue();
            while ((continuation = this.invoker.invokeProcessElement(new NestedProcessContext(this.fn, c, element, w, (RestrictionTracker)(tracker = this.invoker.invokeNewTracker(restriction)), null))).shouldResume()) {
                restriction = ((RestrictionTracker)tracker).checkpoint();
                Uninterruptibles.sleepUninterruptibly(continuation.resumeDelay().getMillis(), TimeUnit.MILLISECONDS);
            }
        }

        @DoFn.FinishBundle
        public void finishBundle(final DoFn.FinishBundleContext c) {
            this.invoker.invokeFinishBundle(new DoFn.FinishBundleContext(){

                @Override
                public PipelineOptions getPipelineOptions() {
                    return c.getPipelineOptions();
                }

                @Override
                public void output(@Nullable OutputT output, Instant timestamp, BoundedWindow window) {
                    throw new UnsupportedOperationException("Output from FinishBundle for SDF is not supported");
                }

                @Override
                public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
                    throw new UnsupportedOperationException("Output from FinishBundle for SDF is not supported");
                }
            });
        }

        @DoFn.Teardown
        public void teardown() {
            this.invoker.invokeTeardown();
        }

        private static class NestedProcessContext<InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT, ?>>
        extends DoFn.ProcessContext
        implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
            private final BoundedWindow window;
            private final DoFn.ProcessContext outerContext;
            private final InputT element;
            private final TrackerT tracker;

            private NestedProcessContext(DoFn<InputT, OutputT> fn, DoFn.ProcessContext outerContext, InputT element, BoundedWindow window, TrackerT tracker) {
                super(fn);
                this.window = window;
                this.outerContext = outerContext;
                this.element = element;
                this.tracker = tracker;
            }

            @Override
            public BoundedWindow window() {
                return this.window;
            }

            @Override
            public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
                return this.outerContext.pane();
            }

            @Override
            public PipelineOptions pipelineOptions() {
                return this.outerContext.getPipelineOptions();
            }

            @Override
            public DoFn.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
                return this;
            }

            @Override
            public DoFn.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
                throw new IllegalStateException();
            }

            @Override
            public InputT element(DoFn<InputT, OutputT> doFn) {
                return this.element;
            }

            @Override
            public Object schemaElement(DoFn<InputT, OutputT> doFn) {
                throw new UnsupportedOperationException();
            }

            @Override
            public Instant timestamp(DoFn<InputT, OutputT> doFn) {
                return this.outerContext.timestamp();
            }

            @Override
            public DoFn.OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
                return new DoFn.OutputReceiver<OutputT>(){

                    @Override
                    public void output(OutputT output) {
                        outerContext.output(output);
                    }

                    @Override
                    public void outputWithTimestamp(OutputT output, Instant timestamp) {
                        outerContext.outputWithTimestamp(output, timestamp);
                    }
                };
            }

            @Override
            public DoFn.MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
                return new DoFn.MultiOutputReceiver(){

                    @Override
                    public <T> DoFn.OutputReceiver<T> get(final TupleTag<T> tag) {
                        return new DoFn.OutputReceiver<T>(){

                            @Override
                            public void output(T output) {
                                outerContext.output(tag, output);
                            }

                            @Override
                            public void outputWithTimestamp(T output, Instant timestamp) {
                                outerContext.outputWithTimestamp(tag, output, timestamp);
                            }
                        };
                    }

                    @Override
                    public <T> DoFn.OutputReceiver<Row> getRowReceiver(TupleTag<T> tag) {
                        throw new UnsupportedOperationException();
                    }
                };
            }

            @Override
            public RestrictionTracker<?, ?> restrictionTracker() {
                return this.tracker;
            }

            @Override
            public PipelineOptions getPipelineOptions() {
                return this.outerContext.getPipelineOptions();
            }

            @Override
            public void output(OutputT output) {
                this.outerContext.output(output);
            }

            @Override
            public void outputWithTimestamp(OutputT output, Instant timestamp) {
                this.outerContext.outputWithTimestamp(output, timestamp);
            }

            @Override
            public <T> void output(TupleTag<T> tag, T output) {
                this.outerContext.output(tag, output);
            }

            @Override
            public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
                this.outerContext.outputWithTimestamp(tag, output, timestamp);
            }

            @Override
            public InputT element() {
                return this.element;
            }

            @Override
            public <T> T sideInput(PCollectionView<T> view) {
                return this.outerContext.sideInput(view);
            }

            @Override
            public Instant timestamp() {
                return this.outerContext.timestamp();
            }

            @Override
            public PaneInfo pane() {
                return this.outerContext.pane();
            }

            @Override
            public void updateWatermark(Instant watermark) {
            }

            @Override
            public DoFn.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
                throw new IllegalStateException();
            }

            @Override
            public DoFn.FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) {
                throw new IllegalStateException();
            }

            @Override
            public DoFn.OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
                throw new UnsupportedOperationException();
            }

            @Override
            public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
                throw new IllegalStateException();
            }

            @Override
            public State state(String stateId) {
                throw new UnsupportedOperationException();
            }

            @Override
            public Timer timer(String timerId) {
                throw new UnsupportedOperationException();
            }

            /* synthetic */ NestedProcessContext(DoFn x0, DoFn.ProcessContext x1, Object x2, BoundedWindow x3, RestrictionTracker x4, 1 x5) {
                this(x0, x1, x2, x3, x4);
            }
        }
    }

    static class SplittableProcessNaive<InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT, ?>>
    extends PTransform<PCollection<KV<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple> {
        private final SplittableParDo.ProcessKeyedElements<InputT, OutputT, RestrictionT> original;

        SplittableProcessNaive(SplittableParDo.ProcessKeyedElements<InputT, OutputT, RestrictionT> original) {
            this.original = original;
        }

        @Override
        public PCollectionTuple expand(PCollection<KV<byte[], KV<InputT, RestrictionT>>> input) {
            return (PCollectionTuple)((PCollection)((PCollection)input.apply("Drop key", Values.create())).apply("Reshuffle", Reshuffle.of())).apply("NaiveProcess", ParDo.of(new NaiveProcessFn(this.original.getFn())).withSideInputs(this.original.getSideInputs()).withOutputTags(this.original.getMainOutputTag(), this.original.getAdditionalOutputTags()));
        }
    }

    public static class OverrideFactory<InputT, OutputT, RestrictionT>
    implements PTransformOverrideFactory<PCollection<KV<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple, SplittableParDo.ProcessKeyedElements<InputT, OutputT, RestrictionT>> {
        @Override
        public PTransformOverrideFactory.PTransformReplacement<PCollection<KV<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple> getReplacementTransform(AppliedPTransform<PCollection<KV<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple, SplittableParDo.ProcessKeyedElements<InputT, OutputT, RestrictionT>> transform) {
            Preconditions.checkArgument(DoFnSignatures.signatureForDoFn(transform.getTransform().getFn()).isBoundedPerElement() == PCollection.IsBounded.BOUNDED, "Expecting a bounded-per-element splittable DoFn");
            return PTransformOverrideFactory.PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform), new SplittableProcessNaive(transform.getTransform()));
        }

        @Override
        public Map<PValue, PTransformOverrideFactory.ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs, PCollectionTuple newOutput) {
            return ReplacementOutputs.tagged(outputs, newOutput);
        }
    }
}

