/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.core;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItemCoder;
import org.apache.beam.runners.core.SplittableProcessElementInvoker;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternalsFactory;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.joda.time.Instant;

public class SplittableParDoViaKeyedWorkItems {

    @VisibleForTesting
    public static class ProcessFn<InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT, ?>>
    extends DoFn<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> {
        private static final StateTag<WatermarkHoldState> watermarkHoldTag = StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal("hold", TimestampCombiner.LATEST));
        private final StateTag<ValueState<WindowedValue<InputT>>> elementTag;
        private StateTag<ValueState<RestrictionT>> restrictionTag;
        private final DoFn<InputT, OutputT> fn;
        private final Coder<InputT> elementCoder;
        private final Coder<RestrictionT> restrictionCoder;
        private final WindowingStrategy<InputT, ?> inputWindowingStrategy;
        @Nullable
        private transient StateInternalsFactory<byte[]> stateInternalsFactory;
        @Nullable
        private transient TimerInternalsFactory<byte[]> timerInternalsFactory;
        @Nullable
        private transient SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, TrackerT> processElementInvoker;
        @Nullable
        private transient DoFnInvoker<InputT, OutputT> invoker;

        public ProcessFn(DoFn<InputT, OutputT> fn, Coder<InputT> elementCoder, Coder<RestrictionT> restrictionCoder, WindowingStrategy<InputT, ?> inputWindowingStrategy) {
            this.fn = fn;
            this.elementCoder = elementCoder;
            this.restrictionCoder = restrictionCoder;
            this.inputWindowingStrategy = inputWindowingStrategy;
            this.elementTag = StateTags.value("element", WindowedValue.getFullCoder(elementCoder, inputWindowingStrategy.getWindowFn().windowCoder()));
            this.restrictionTag = StateTags.value("restriction", restrictionCoder);
        }

        public void setStateInternalsFactory(StateInternalsFactory<byte[]> stateInternalsFactory) {
            this.stateInternalsFactory = stateInternalsFactory;
        }

        public void setTimerInternalsFactory(TimerInternalsFactory<byte[]> timerInternalsFactory) {
            this.timerInternalsFactory = timerInternalsFactory;
        }

        public void setProcessElementInvoker(SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, TrackerT> invoker) {
            this.processElementInvoker = invoker;
        }

        public DoFn<InputT, OutputT> getFn() {
            return this.fn;
        }

        public Coder<InputT> getElementCoder() {
            return this.elementCoder;
        }

        public Coder<RestrictionT> getRestrictionCoder() {
            return this.restrictionCoder;
        }

        public WindowingStrategy<InputT, ?> getInputWindowingStrategy() {
            return this.inputWindowingStrategy;
        }

        @DoFn.Setup
        public void setup() throws Exception {
            this.invoker = DoFnInvokers.invokerFor(this.fn);
            this.invoker.invokeSetup();
        }

        @DoFn.Teardown
        public void tearDown() throws Exception {
            this.invoker.invokeTeardown();
        }

        @DoFn.StartBundle
        public void startBundle(DoFn.StartBundleContext c) throws Exception {
            this.invoker.invokeStartBundle(this.wrapContextAsStartBundle(c));
        }

        @DoFn.FinishBundle
        public void finishBundle(DoFn.FinishBundleContext c) throws Exception {
            this.invoker.invokeFinishBundle(this.wrapContextAsFinishBundle(c));
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
            KV<WindowedValue<Object>, Object> elementAndRestriction;
            StateNamespace stateNamespace;
            boolean isSeedCall;
            byte[] key = (byte[])((KeyedWorkItem)c.element()).key();
            StateInternals stateInternals = this.stateInternalsFactory.stateInternalsForKey(key);
            TimerInternals timerInternals = this.timerInternalsFactory.timerInternalsForKey(key);
            TimerInternals.TimerData timer = Iterables.getOnlyElement(((KeyedWorkItem)c.element()).timersIterable(), null);
            boolean bl = isSeedCall = timer == null;
            if (isSeedCall) {
                WindowedValue windowedValue = Iterables.getOnlyElement(((KeyedWorkItem)c.element()).elementsIterable());
                BoundedWindow window = Iterables.getOnlyElement(windowedValue.getWindows());
                stateNamespace = StateNamespaces.window(this.inputWindowingStrategy.getWindowFn().windowCoder(), window);
            } else {
                stateNamespace = timer.getNamespace();
            }
            ValueState<WindowedValue<InputT>> elementState = stateInternals.state(stateNamespace, this.elementTag);
            ValueState<RestrictionT> restrictionState = stateInternals.state(stateNamespace, this.restrictionTag);
            WatermarkHoldState holdState = stateInternals.state(stateNamespace, watermarkHoldTag);
            if (isSeedCall) {
                WindowedValue windowedValue = Iterables.getOnlyElement(((KeyedWorkItem)c.element()).elementsIterable());
                WindowedValue element = windowedValue.withValue(((KV)windowedValue.getValue()).getKey());
                elementState.write(element);
                elementAndRestriction = KV.of(element, ((KV)windowedValue.getValue()).getValue());
            } else {
                elementState.readLater();
                restrictionState.readLater();
                elementAndRestriction = KV.of((WindowedValue)elementState.read(), restrictionState.read());
            }
            Object tracker = this.invoker.invokeNewTracker(elementAndRestriction.getValue());
            SplittableProcessElementInvoker.Result result = this.processElementInvoker.invokeProcessElement(this.invoker, elementAndRestriction.getKey(), tracker);
            if (result.getResidualRestriction() == null) {
                elementState.clear();
                restrictionState.clear();
                holdState.clear();
                return;
            }
            restrictionState.write(result.getResidualRestriction());
            Instant futureOutputWatermark = result.getFutureOutputWatermark();
            if (futureOutputWatermark == null) {
                futureOutputWatermark = elementAndRestriction.getKey().getTimestamp();
            }
            Instant wakeupTime = timerInternals.currentProcessingTime().plus(result.getContinuation().resumeDelay());
            holdState.add(futureOutputWatermark);
            timerInternals.setTimer(TimerInternals.TimerData.of(stateNamespace, wakeupTime, TimeDomain.PROCESSING_TIME));
        }

        private DoFn.StartBundleContext wrapContextAsStartBundle(final DoFn.StartBundleContext baseContext) {
            DoFn<InputT, OutputT> doFn = this.fn;
            Objects.requireNonNull(doFn);
            return new DoFn.StartBundleContext(doFn){

                @Override
                public PipelineOptions getPipelineOptions() {
                    return baseContext.getPipelineOptions();
                }
            };
        }

        private DoFn.FinishBundleContext wrapContextAsFinishBundle(final DoFn.FinishBundleContext baseContext) {
            DoFn<InputT, OutputT> doFn = this.fn;
            Objects.requireNonNull(doFn);
            return new DoFn.FinishBundleContext(doFn){

                @Override
                public void output(OutputT output, Instant timestamp, BoundedWindow window) {
                    this.throwUnsupportedOutput();
                }

                @Override
                public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
                    this.throwUnsupportedOutput();
                }

                @Override
                public PipelineOptions getPipelineOptions() {
                    return baseContext.getPipelineOptions();
                }

                private void throwUnsupportedOutput() {
                    throw new UnsupportedOperationException(String.format("Splittable DoFn can only output from @%s", DoFn.ProcessElement.class.getSimpleName()));
                }
            };
        }
    }

    public static class ProcessElements<InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT, ?>>
    extends PTransform<PCollection<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple> {
        private final SplittableParDo.ProcessKeyedElements<InputT, OutputT, RestrictionT> original;

        public ProcessElements(SplittableParDo.ProcessKeyedElements<InputT, OutputT, RestrictionT> original) {
            this.original = original;
        }

        public ProcessFn<InputT, OutputT, RestrictionT, TrackerT> newProcessFn(DoFn<InputT, OutputT> fn) {
            return new ProcessFn(fn, this.original.getElementCoder(), this.original.getRestrictionCoder(), this.original.getInputWindowingStrategy());
        }

        public DoFn<InputT, OutputT> getFn() {
            return this.original.getFn();
        }

        public List<PCollectionView<?>> getSideInputs() {
            return this.original.getSideInputs();
        }

        public TupleTag<OutputT> getMainOutputTag() {
            return this.original.getMainOutputTag();
        }

        public TupleTagList getAdditionalOutputTags() {
            return this.original.getAdditionalOutputTags();
        }

        @Override
        public PCollectionTuple expand(PCollection<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>> input) {
            return SplittableParDo.ProcessKeyedElements.createPrimitiveOutputFor(input, this.original.getFn(), this.original.getMainOutputTag(), this.original.getAdditionalOutputTags(), this.original.getOutputTagsToCoders(), this.original.getInputWindowingStrategy());
        }
    }

    public static class SplittableProcessViaKeyedWorkItems<InputT, OutputT, RestrictionT>
    extends PTransform<PCollection<KV<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple> {
        private final SplittableParDo.ProcessKeyedElements<InputT, OutputT, RestrictionT> original;

        public SplittableProcessViaKeyedWorkItems(SplittableParDo.ProcessKeyedElements<InputT, OutputT, RestrictionT> original) {
            this.original = original;
        }

        @Override
        public PCollectionTuple expand(PCollection<KV<byte[], KV<InputT, RestrictionT>>> input) {
            return (PCollectionTuple)((PCollection)input.apply(new GBKIntoKeyedWorkItems())).setCoder(KeyedWorkItemCoder.of(ByteArrayCoder.of(), ((KvCoder)input.getCoder()).getValueCoder(), input.getWindowingStrategy().getWindowFn().windowCoder())).apply(new ProcessElements(this.original));
        }
    }

    public static class OverrideFactory<InputT, OutputT, RestrictionT>
    implements PTransformOverrideFactory<PCollection<KV<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple, SplittableParDo.ProcessKeyedElements<InputT, OutputT, RestrictionT>> {
        @Override
        public PTransformOverrideFactory.PTransformReplacement<PCollection<KV<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple> getReplacementTransform(AppliedPTransform<PCollection<KV<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple, SplittableParDo.ProcessKeyedElements<InputT, OutputT, RestrictionT>> transform) {
            return PTransformOverrideFactory.PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform), new SplittableProcessViaKeyedWorkItems<InputT, OutputT, RestrictionT>(transform.getTransform()));
        }

        @Override
        public Map<PValue, PTransformOverrideFactory.ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs, PCollectionTuple newOutput) {
            return ReplacementOutputs.tagged(outputs, newOutput);
        }
    }

    public static class GBKIntoKeyedWorkItems<KeyT, InputT>
    extends PTransformTranslation.RawPTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> {
        @Override
        public PCollection<KeyedWorkItem<KeyT, InputT>> expand(PCollection<KV<KeyT, InputT>> input) {
            KvCoder kvCoder = (KvCoder)input.getCoder();
            return PCollection.createPrimitiveOutputInternal(input.getPipeline(), WindowingStrategy.globalDefault(), input.isBounded(), KeyedWorkItemCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()));
        }

        @Override
        public String getUrn() {
            return "beam:runners_core:transforms:splittable_gbkikwi:v1";
        }

        @Override
        public RunnerApi.FunctionSpec getSpec() {
            throw new UnsupportedOperationException(String.format("%s should never be serialized to proto", this.getClass().getSimpleName()));
        }
    }
}

