/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct.repackaged.runners.core;

import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.beam.runners.direct.repackaged.javax.annotation.Nullable;
import org.apache.beam.runners.direct.repackaged.runners.core.DoFnRunner;
import org.apache.beam.runners.direct.repackaged.runners.core.DoFnRunners;
import org.apache.beam.runners.direct.repackaged.runners.core.ExecutionContext;
import org.apache.beam.runners.direct.repackaged.runners.core.LateDataUtils;
import org.apache.beam.runners.direct.repackaged.runners.core.SideInputReader;
import org.apache.beam.runners.direct.repackaged.runners.core.StateNamespace;
import org.apache.beam.runners.direct.repackaged.runners.core.StateNamespaces;
import org.apache.beam.runners.direct.repackaged.runners.core.StateTags;
import org.apache.beam.runners.direct.repackaged.runners.core.TimerInternals;
import org.apache.beam.runners.direct.repackaged.runners.core.java.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.runners.direct.repackaged.runners.core.java.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.runners.direct.repackaged.runners.core.java.repackaged.com.google.common.collect.Sets;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;
import org.joda.time.ReadablePeriod;
import org.joda.time.format.PeriodFormat;

public class SimpleDoFnRunner<InputT, OutputT>
implements DoFnRunner<InputT, OutputT> {
    private final DoFn<InputT, OutputT> fn;
    private final DoFnInvoker<InputT, OutputT> invoker;
    private final DoFnContext<InputT, OutputT> context;
    private final DoFnRunners.OutputManager outputManager;
    private final TupleTag<OutputT> mainOutputTag;
    private final boolean observesWindow;
    private final DoFnSignature signature;
    private final Coder<BoundedWindow> windowCoder;
    private final Duration allowedLateness;
    private final ExecutionContext.StepContext stepContext;

    public SimpleDoFnRunner(PipelineOptions options, DoFn<InputT, OutputT> fn, SideInputReader sideInputReader, DoFnRunners.OutputManager outputManager, TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> additionalOutputTags, ExecutionContext.StepContext stepContext, WindowingStrategy<?, ?> windowingStrategy) {
        Coder untypedCoder;
        this.fn = fn;
        this.signature = DoFnSignatures.getSignature(fn.getClass());
        this.observesWindow = this.signature.processElement().observesWindow() || !sideInputReader.isEmpty();
        this.invoker = DoFnInvokers.invokerFor(fn);
        this.outputManager = outputManager;
        this.mainOutputTag = mainOutputTag;
        this.stepContext = stepContext;
        this.windowCoder = untypedCoder = windowingStrategy.getWindowFn().windowCoder();
        this.allowedLateness = windowingStrategy.getAllowedLateness();
        this.context = new DoFnContext<InputT, OutputT>(options, fn, sideInputReader, outputManager, mainOutputTag, additionalOutputTags, stepContext, windowingStrategy.getWindowFn());
    }

    @Override
    public void startBundle() {
        DoFnStartBundleContext<InputT, OutputT> startBundleContext = this.createStartBundleContext(this.fn, this.context);
        try {
            this.invoker.invokeStartBundle(startBundleContext);
        }
        catch (Throwable t) {
            throw this.wrapUserCodeException(t);
        }
    }

    @Override
    public void processElement(WindowedValue<InputT> compressedElem) {
        if (this.observesWindow) {
            for (WindowedValue elem : compressedElem.explodeWindows()) {
                this.invokeProcessElement(elem);
            }
        } else {
            this.invokeProcessElement(compressedElem);
        }
    }

    @Override
    public void onTimer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
        Instant effectiveTimestamp;
        switch (timeDomain) {
            case EVENT_TIME: {
                effectiveTimestamp = timestamp;
                break;
            }
            case PROCESSING_TIME: 
            case SYNCHRONIZED_PROCESSING_TIME: {
                effectiveTimestamp = this.context.stepContext.timerInternals().currentInputWatermarkTime();
                break;
            }
            default: {
                throw new IllegalArgumentException(String.format("Unknown time domain: %s", timeDomain));
            }
        }
        OnTimerArgumentProvider argumentProvider = new OnTimerArgumentProvider(this.fn, this.context, window, this.allowedLateness, effectiveTimestamp, timeDomain);
        this.invoker.invokeOnTimer(timerId, argumentProvider);
    }

    private void invokeProcessElement(WindowedValue<InputT> elem) {
        DoFnProcessContext<InputT, OutputT> processContext = this.createProcessContext(elem);
        try {
            this.invoker.invokeProcessElement(processContext);
        }
        catch (Exception ex) {
            throw this.wrapUserCodeException(ex);
        }
    }

    @Override
    public void finishBundle() {
        DoFnFinishBundleContext<InputT, OutputT> finishBundleContext = this.createFinishBundleContext(this.fn, this.context);
        try {
            this.invoker.invokeFinishBundle(finishBundleContext);
        }
        catch (Throwable t) {
            throw this.wrapUserCodeException(t);
        }
    }

    private DoFnStartBundleContext<InputT, OutputT> createStartBundleContext(DoFn<InputT, OutputT> fn, DoFnContext<InputT, OutputT> context) {
        return new DoFnStartBundleContext(this.fn, this.context);
    }

    private DoFnFinishBundleContext<InputT, OutputT> createFinishBundleContext(DoFn<InputT, OutputT> fn, DoFnContext<InputT, OutputT> context) {
        return new DoFnFinishBundleContext(fn, context);
    }

    private DoFnProcessContext<InputT, OutputT> createProcessContext(WindowedValue<InputT> elem) {
        return new DoFnProcessContext(this.fn, this.context, elem, this.allowedLateness);
    }

    private RuntimeException wrapUserCodeException(Throwable t) {
        throw UserCodeException.wrapIf((!this.isSystemDoFn() ? 1 : 0) != 0, (Throwable)t);
    }

    private boolean isSystemDoFn() {
        return this.invoker.getClass().isAnnotationPresent(SystemDoFnInternal.class);
    }

    private static class TimerInternalsTimer
    implements Timer {
        private final TimerInternals timerInternals;
        private final BoundedWindow window;
        private final Duration allowedLateness;
        private final StateNamespace namespace;
        private final String timerId;
        private final TimerSpec spec;
        private Duration period = Duration.ZERO;
        private Duration offset = Duration.ZERO;

        public TimerInternalsTimer(BoundedWindow window, StateNamespace namespace, Duration allowedLateness, String timerId, TimerSpec spec, TimerInternals timerInternals) {
            this.window = window;
            this.allowedLateness = allowedLateness;
            this.namespace = namespace;
            this.timerId = timerId;
            this.spec = spec;
            this.timerInternals = timerInternals;
        }

        public void set(Instant target) {
            this.verifyAbsoluteTimeDomain();
            this.verifyTargetTime(target);
            this.setUnderlyingTimer(target);
        }

        public void setRelative() {
            long millisSinceStart;
            Instant now = this.getCurrentTime();
            Instant target = this.period.equals((Object)Duration.ZERO) ? now.plus((ReadableDuration)this.offset) : ((millisSinceStart = now.plus((ReadableDuration)this.offset).getMillis() % this.period.getMillis()) == 0L ? now : now.plus((ReadableDuration)this.period).minus(millisSinceStart));
            target = this.minTargetAndGcTime(target);
            this.setUnderlyingTimer(target);
        }

        public Timer offset(Duration offset) {
            this.offset = offset;
            return this;
        }

        public Timer align(Duration period) {
            this.period = period;
            return this;
        }

        private Instant minTargetAndGcTime(Instant target) {
            Instant windowExpiry;
            if (TimeDomain.EVENT_TIME.equals((Object)this.spec.getTimeDomain()) && target.isAfter((ReadableInstant)(windowExpiry = LateDataUtils.garbageCollectionTime(this.window, this.allowedLateness)))) {
                return windowExpiry;
            }
            return target;
        }

        private void verifyTargetTime(Instant target) {
            if (TimeDomain.EVENT_TIME.equals((Object)this.spec.getTimeDomain())) {
                Instant windowExpiry = this.window.maxTimestamp().plus((ReadableDuration)this.allowedLateness);
                Preconditions.checkArgument(!target.isAfter((ReadableInstant)windowExpiry), "Attempted to set event time timer for %s but that is after the expiration of window %s", (Object)target, (Object)windowExpiry);
            }
        }

        private void verifyAbsoluteTimeDomain() {
            if (!TimeDomain.EVENT_TIME.equals((Object)this.spec.getTimeDomain())) {
                throw new IllegalStateException("Cannot only set relative timers in processing time domain. Use #setRelative()");
            }
        }

        private void setUnderlyingTimer(Instant target) {
            this.timerInternals.setTimer(this.namespace, this.timerId, target, this.spec.getTimeDomain());
        }

        private Instant getCurrentTime() {
            switch (this.spec.getTimeDomain()) {
                case EVENT_TIME: {
                    return this.timerInternals.currentInputWatermarkTime();
                }
                case PROCESSING_TIME: {
                    return this.timerInternals.currentProcessingTime();
                }
                case SYNCHRONIZED_PROCESSING_TIME: {
                    return this.timerInternals.currentSynchronizedProcessingTime();
                }
            }
            throw new IllegalStateException(String.format("Timer created for unknown time domain %s", this.spec.getTimeDomain()));
        }
    }

    private class OnTimerArgumentProvider<InputT, OutputT>
    extends DoFn.OnTimerContext
    implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
        final DoFn<InputT, OutputT> fn;
        final DoFnContext<InputT, OutputT> context;
        private final BoundedWindow window;
        private final Instant timestamp;
        private final TimeDomain timeDomain;
        private final Duration allowedLateness;
        private StateNamespace namespace;

        private StateNamespace getNamespace() {
            if (this.namespace == null) {
                this.namespace = StateNamespaces.window(SimpleDoFnRunner.this.windowCoder, this.window);
            }
            return this.namespace;
        }

        private OnTimerArgumentProvider(DoFn<InputT, OutputT> fn, DoFnContext<InputT, OutputT> context, BoundedWindow window, Duration allowedLateness, Instant timestamp, TimeDomain timeDomain) {
            super(fn);
            this.fn = fn;
            this.context = context;
            this.window = window;
            this.allowedLateness = allowedLateness;
            this.timestamp = timestamp;
            this.timeDomain = timeDomain;
        }

        public Instant timestamp() {
            return this.timestamp;
        }

        public BoundedWindow window() {
            return this.window;
        }

        public DoFn.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
            throw new UnsupportedOperationException("StartBundleContext parameters are not supported.");
        }

        public DoFn.FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) {
            throw new UnsupportedOperationException("FinishBundleContext parameters are not supported.");
        }

        public TimeDomain timeDomain() {
            return this.timeDomain;
        }

        public DoFn.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
            throw new UnsupportedOperationException("ProcessContext parameters are not supported.");
        }

        public DoFn.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
            return this;
        }

        public RestrictionTracker<?> restrictionTracker() {
            throw new UnsupportedOperationException("RestrictionTracker parameters are not supported.");
        }

        public State state(String stateId) {
            try {
                StateSpec spec = (StateSpec)((DoFnSignature.StateDeclaration)SimpleDoFnRunner.this.signature.stateDeclarations().get(stateId)).field().get(this.fn);
                return SimpleDoFnRunner.this.stepContext.stateInternals().state(this.getNamespace(), StateTags.tagForSpec(stateId, spec));
            }
            catch (IllegalAccessException e) {
                throw new RuntimeException(e);
            }
        }

        public Timer timer(String timerId) {
            try {
                TimerSpec spec = (TimerSpec)((DoFnSignature.TimerDeclaration)SimpleDoFnRunner.this.signature.timerDeclarations().get(timerId)).field().get(this.fn);
                return new TimerInternalsTimer(this.window, this.getNamespace(), this.allowedLateness, timerId, spec, SimpleDoFnRunner.this.stepContext.timerInternals());
            }
            catch (IllegalAccessException e) {
                throw new RuntimeException(e);
            }
        }

        public PipelineOptions getPipelineOptions() {
            return this.context.getPipelineOptions();
        }

        public void output(OutputT output) {
            this.context.outputWindowedValue(output, this.timestamp(), Collections.singleton(this.window()), PaneInfo.NO_FIRING);
        }

        public void outputWithTimestamp(OutputT output, Instant timestamp) {
            this.context.outputWindowedValue(output, timestamp, Collections.singleton(this.window()), PaneInfo.NO_FIRING);
        }

        public <T> void output(TupleTag<T> tag, T output) {
            ((DoFnContext)this.context).outputWindowedValue(tag, output, this.timestamp, Collections.singleton(this.window()), PaneInfo.NO_FIRING);
        }

        public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
            ((DoFnContext)this.context).outputWindowedValue(tag, output, timestamp, Collections.singleton(this.window()), PaneInfo.NO_FIRING);
        }
    }

    private class DoFnProcessContext<InputT, OutputT>
    extends DoFn.ProcessContext
    implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
        final DoFn<InputT, OutputT> fn;
        final DoFnContext<InputT, OutputT> context;
        final WindowedValue<InputT> windowedValue;
        private final Duration allowedLateness;
        @Nullable
        private StateNamespace namespace;

        private StateNamespace getNamespace() {
            if (this.namespace == null) {
                this.namespace = StateNamespaces.window(SimpleDoFnRunner.this.windowCoder, this.window());
            }
            return this.namespace;
        }

        private DoFnProcessContext(DoFn<InputT, OutputT> fn, DoFnContext<InputT, OutputT> context, WindowedValue<InputT> windowedValue, Duration allowedLateness) {
            super(fn);
            this.fn = fn;
            this.context = context;
            this.windowedValue = windowedValue;
            this.allowedLateness = allowedLateness;
        }

        public PipelineOptions getPipelineOptions() {
            return this.context.getPipelineOptions();
        }

        public InputT element() {
            return (InputT)this.windowedValue.getValue();
        }

        /*
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        public <T> T sideInput(PCollectionView<T> view) {
            BoundedWindow window;
            Preconditions.checkNotNull(view, "View passed to sideInput cannot be null");
            Iterator<BoundedWindow> windowIter = this.windows().iterator();
            if (!windowIter.hasNext()) {
                if (!(this.context.windowFn instanceof GlobalWindows)) throw new IllegalStateException("sideInput called when main input element is not in any windows");
                window = GlobalWindow.INSTANCE;
                return this.context.sideInput(view, view.getWindowMappingFn().getSideInputWindow(window));
            } else {
                window = windowIter.next();
                if (!windowIter.hasNext()) return this.context.sideInput(view, view.getWindowMappingFn().getSideInputWindow(window));
                throw new IllegalStateException("sideInput called when main input element is in multiple windows");
            }
        }

        public PaneInfo pane() {
            return this.windowedValue.getPane();
        }

        public void updateWatermark(Instant watermark) {
            throw new UnsupportedOperationException("Only splittable DoFn's can use updateWatermark()");
        }

        public void output(OutputT output) {
            this.context.outputWindowedValue(this.windowedValue.withValue(output));
        }

        public void outputWithTimestamp(OutputT output, Instant timestamp) {
            this.checkTimestamp(timestamp);
            this.context.outputWindowedValue(output, timestamp, this.windowedValue.getWindows(), this.windowedValue.getPane());
        }

        public <T> void output(TupleTag<T> tag, T output) {
            Preconditions.checkNotNull(tag, "Tag passed to output cannot be null");
            ((DoFnContext)this.context).outputWindowedValue(tag, this.windowedValue.withValue(output));
        }

        public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
            Preconditions.checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null");
            this.checkTimestamp(timestamp);
            ((DoFnContext)this.context).outputWindowedValue(tag, output, timestamp, this.windowedValue.getWindows(), this.windowedValue.getPane());
        }

        public Instant timestamp() {
            return this.windowedValue.getTimestamp();
        }

        public Collection<? extends BoundedWindow> windows() {
            return this.windowedValue.getWindows();
        }

        private void checkTimestamp(Instant timestamp) {
            if (this.fn.getAllowedTimestampSkew().getMillis() != Long.MAX_VALUE && timestamp.isBefore((ReadableInstant)this.windowedValue.getTimestamp().minus((ReadableDuration)this.fn.getAllowedTimestampSkew()))) {
                throw new IllegalArgumentException(String.format("Cannot output with timestamp %s. Output timestamps must be no earlier than the timestamp of the current input (%s) minus the allowed skew (%s). See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.", timestamp, this.windowedValue.getTimestamp(), PeriodFormat.getDefault().print((ReadablePeriod)this.fn.getAllowedTimestampSkew().toPeriod())));
            }
        }

        public BoundedWindow window() {
            return (BoundedWindow)Iterables.getOnlyElement(this.windowedValue.getWindows());
        }

        public DoFn.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
            throw new UnsupportedOperationException("StartBundleContext parameters are not supported.");
        }

        public DoFn.FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) {
            throw new UnsupportedOperationException("FinishBundleContext parameters are not supported.");
        }

        public DoFn.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
            return this;
        }

        public DoFn.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
            throw new UnsupportedOperationException("Cannot access OnTimerContext outside of @OnTimer methods.");
        }

        public RestrictionTracker<?> restrictionTracker() {
            throw new UnsupportedOperationException("RestrictionTracker parameters are not supported.");
        }

        public State state(String stateId) {
            try {
                StateSpec spec = (StateSpec)((DoFnSignature.StateDeclaration)SimpleDoFnRunner.this.signature.stateDeclarations().get(stateId)).field().get(this.fn);
                return SimpleDoFnRunner.this.stepContext.stateInternals().state(this.getNamespace(), StateTags.tagForSpec(stateId, spec));
            }
            catch (IllegalAccessException e) {
                throw new RuntimeException(e);
            }
        }

        public Timer timer(String timerId) {
            try {
                TimerSpec spec = (TimerSpec)((DoFnSignature.TimerDeclaration)SimpleDoFnRunner.this.signature.timerDeclarations().get(timerId)).field().get(this.fn);
                return new TimerInternalsTimer(this.window(), this.getNamespace(), this.allowedLateness, timerId, spec, SimpleDoFnRunner.this.stepContext.timerInternals());
            }
            catch (IllegalAccessException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private class DoFnFinishBundleContext<InputT, OutputT>
    extends DoFn.FinishBundleContext
    implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
        private final DoFnContext<InputT, OutputT> context;

        private DoFnFinishBundleContext(DoFn<InputT, OutputT> fn, DoFnContext<InputT, OutputT> context) {
            super(fn);
            this.context = context;
        }

        public PipelineOptions getPipelineOptions() {
            return this.context.getPipelineOptions();
        }

        public BoundedWindow window() {
            throw new UnsupportedOperationException("Cannot access window outside of @ProcessElement and @OnTimer methods.");
        }

        public DoFn.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
            throw new UnsupportedOperationException("Cannot access StartBundleContext outside of @StartBundle method.");
        }

        public DoFn.FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) {
            return this;
        }

        public DoFn.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
            throw new UnsupportedOperationException("Cannot access ProcessContext outside of @ProcessElement method.");
        }

        public DoFn.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
            throw new UnsupportedOperationException("Cannot access OnTimerContext outside of @OnTimer methods.");
        }

        public RestrictionTracker<?> restrictionTracker() {
            throw new UnsupportedOperationException("Cannot access RestrictionTracker outside of @ProcessElement method.");
        }

        public State state(String stateId) {
            throw new UnsupportedOperationException("Cannot access state outside of @ProcessElement and @OnTimer methods.");
        }

        public Timer timer(String timerId) {
            throw new UnsupportedOperationException("Cannot access timers outside of @ProcessElement and @OnTimer methods.");
        }

        public void output(OutputT output, Instant timestamp, BoundedWindow window) {
            this.context.outputWindowedValue(WindowedValue.of(output, (Instant)timestamp, (BoundedWindow)window, (PaneInfo)PaneInfo.NO_FIRING));
        }

        public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
            ((DoFnContext)this.context).outputWindowedValue(tag, WindowedValue.of(output, (Instant)timestamp, (BoundedWindow)window, (PaneInfo)PaneInfo.NO_FIRING));
        }
    }

    private class DoFnStartBundleContext<InputT, OutputT>
    extends DoFn.StartBundleContext
    implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
        private final DoFn<InputT, OutputT> fn;
        private final DoFnContext<InputT, OutputT> context;

        private DoFnStartBundleContext(DoFn<InputT, OutputT> fn, DoFnContext<InputT, OutputT> context) {
            super(fn);
            this.fn = fn;
            this.context = context;
        }

        public PipelineOptions getPipelineOptions() {
            return this.context.getPipelineOptions();
        }

        public BoundedWindow window() {
            throw new UnsupportedOperationException("Cannot access window outside of @ProcessElement and @OnTimer methods.");
        }

        public DoFn.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
            return this;
        }

        public DoFn.FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) {
            throw new UnsupportedOperationException("Cannot access FinishBundleContext outside of @FinishBundle method.");
        }

        public DoFn.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
            throw new UnsupportedOperationException("Cannot access ProcessContext outside of @ProcessElement method.");
        }

        public DoFn.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
            throw new UnsupportedOperationException("Cannot access OnTimerContext outside of @OnTimer methods.");
        }

        public RestrictionTracker<?> restrictionTracker() {
            throw new UnsupportedOperationException("Cannot access RestrictionTracker outside of @ProcessElement method.");
        }

        public State state(String stateId) {
            throw new UnsupportedOperationException("Cannot access state outside of @ProcessElement and @OnTimer methods.");
        }

        public Timer timer(String timerId) {
            throw new UnsupportedOperationException("Cannot access timers outside of @ProcessElement and @OnTimer methods.");
        }
    }

    private static class DoFnContext<InputT, OutputT> {
        private static final int MAX_SIDE_OUTPUTS = 1000;
        final PipelineOptions options;
        final DoFn<InputT, OutputT> fn;
        final SideInputReader sideInputReader;
        final DoFnRunners.OutputManager outputManager;
        final TupleTag<OutputT> mainOutputTag;
        final ExecutionContext.StepContext stepContext;
        final WindowFn<?, ?> windowFn;
        private Set<TupleTag<?>> outputTags;

        public DoFnContext(PipelineOptions options, DoFn<InputT, OutputT> fn, SideInputReader sideInputReader, DoFnRunners.OutputManager outputManager, TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> additionalOutputTags, ExecutionContext.StepContext stepContext, WindowFn<?, ?> windowFn) {
            this.options = options;
            this.fn = fn;
            this.sideInputReader = sideInputReader;
            this.outputManager = outputManager;
            this.mainOutputTag = mainOutputTag;
            this.outputTags = Sets.newHashSet();
            this.outputTags.add(mainOutputTag);
            for (TupleTag<?> additionalOutputTag : additionalOutputTags) {
                this.outputTags.add(additionalOutputTag);
            }
            this.stepContext = stepContext;
            this.windowFn = windowFn;
        }

        public PipelineOptions getPipelineOptions() {
            return this.options;
        }

        <T, W extends BoundedWindow> WindowedValue<T> makeWindowedValue(T output, Instant timestamp, Collection<W> windows, PaneInfo pane) {
            final Instant inputTimestamp = timestamp;
            if (timestamp == null) {
                timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
            }
            if (windows == null) {
                try {
                    WindowFn<?, ?> objectWindowFn;
                    WindowFn<?, ?> windowFn = objectWindowFn = this.windowFn;
                    windowFn.getClass();
                    windows = objectWindowFn.assignWindows(new WindowFn.AssignContext(windowFn){

                        public Object element() {
                            throw new UnsupportedOperationException("WindowFn attempted to access input element when none was available");
                        }

                        public Instant timestamp() {
                            if (inputTimestamp == null) {
                                throw new UnsupportedOperationException("WindowFn attempted to access input timestamp when none was available");
                            }
                            return inputTimestamp;
                        }

                        public W window() {
                            throw new UnsupportedOperationException("WindowFn attempted to access input windows when none were available");
                        }
                    });
                }
                catch (Exception e) {
                    throw UserCodeException.wrap((Throwable)e);
                }
            }
            return WindowedValue.of(output, (Instant)timestamp, windows, (PaneInfo)pane);
        }

        public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) {
            if (!this.sideInputReader.contains(view)) {
                throw new IllegalArgumentException("calling sideInput() with unknown view");
            }
            return this.sideInputReader.get(view, sideInputWindow);
        }

        void outputWindowedValue(OutputT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
            this.outputWindowedValue(this.makeWindowedValue(output, timestamp, windows, pane));
        }

        void outputWindowedValue(WindowedValue<OutputT> windowedElem) {
            this.outputManager.output(this.mainOutputTag, windowedElem);
            if (this.stepContext != null) {
                this.stepContext.noteOutput(windowedElem);
            }
        }

        private <T> void outputWindowedValue(TupleTag<T> tag, T output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
            this.outputWindowedValue(tag, this.makeWindowedValue(output, timestamp, windows, pane));
        }

        private <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) {
            if (!this.outputTags.contains(tag)) {
                if (this.outputTags.size() >= 1000) {
                    throw new IllegalArgumentException("the number of outputs has exceeded a limit of 1000");
                }
                this.outputTags.add(tag);
            }
            this.outputManager.output(tag, windowedElem);
            if (this.stepContext != null) {
                this.stepContext.noteOutput(tag, windowedElem);
            }
        }
    }
}

