/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.beam.fn.harness.DoFnPTransformRunnerFactory;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.state.FnApiStateAccessor;
import org.apache.beam.runners.core.LateDataUtils;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnOutputReceivers;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.joda.time.DateTimeUtils;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class FnApiDoFnRunner<InputT, OutputT>
implements DoFnPTransformRunnerFactory.DoFnPTransformRunner<InputT> {
    private final DoFnPTransformRunnerFactory.Context<InputT, OutputT> context;
    private final Collection<FnDataReceiver<WindowedValue<OutputT>>> mainOutputConsumers;
    private FnApiStateAccessor stateAccessor;
    private final DoFnInvoker<InputT, OutputT> doFnInvoker;
    private final DoFn.StartBundleContext startBundleContext;
    private final ProcessBundleContext processContext;
    private final OnTimerContext onTimerContext;
    private final DoFn.FinishBundleContext finishBundleContext;
    private WindowedValue<InputT> currentElement;
    private BoundedWindow currentWindow;
    private WindowedValue<KV<Object, Timer>> currentTimer;
    private TimeDomain currentTimeDomain;
    private DoFnSchemaInformation doFnSchemaInformation;

    FnApiDoFnRunner(final DoFnPTransformRunnerFactory.Context<InputT, OutputT> context) {
        this.context = context;
        this.mainOutputConsumers = context.localNameToConsumer.get(context.mainOutputTag.getId());
        this.doFnInvoker = DoFnInvokers.invokerFor(context.doFn);
        this.doFnInvoker.invokeSetup();
        DoFn doFn = this.context.doFn;
        Objects.requireNonNull(doFn);
        this.startBundleContext = new DoFn.StartBundleContext(doFn){

            @Override
            public PipelineOptions getPipelineOptions() {
                return context.pipelineOptions;
            }
        };
        this.processContext = new ProcessBundleContext();
        this.onTimerContext = new OnTimerContext();
        DoFn doFn2 = this.context.doFn;
        Objects.requireNonNull(doFn2);
        this.finishBundleContext = new DoFn.FinishBundleContext(doFn2){

            @Override
            public PipelineOptions getPipelineOptions() {
                return context.pipelineOptions;
            }

            @Override
            public void output(OutputT output, Instant timestamp, BoundedWindow window) {
                FnApiDoFnRunner.this.outputTo(FnApiDoFnRunner.this.mainOutputConsumers, WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING));
            }

            @Override
            public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
                List<FnDataReceiver<WindowedValue<?>>> consumers = context.localNameToConsumer.get(tag.getId());
                if (consumers == null) {
                    throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
                }
                FnApiDoFnRunner.this.outputTo(consumers, WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING));
            }
        };
        this.doFnSchemaInformation = ParDoTranslation.getSchemaInformation(context.parDoPayload);
    }

    @Override
    public void startBundle() {
        this.stateAccessor = new FnApiStateAccessor(this.context.pipelineOptions, this.context.ptransformId, this.context.processBundleInstructionId, this.context.tagToSideInputSpecMap, this.context.beamFnStateClient, this.context.keyCoder, this.context.windowCoder, () -> MoreObjects.firstNonNull(this.currentElement, this.currentTimer), () -> this.currentWindow);
        this.doFnInvoker.invokeStartBundle(this.startBundleContext);
    }

    @Override
    public void processElement(WindowedValue<InputT> elem) {
        this.currentElement = elem;
        try {
            for (BoundedWindow this.currentWindow : elem.getWindows()) {
                this.doFnInvoker.invokeProcessElement(this.processContext);
            }
        }
        finally {
            this.currentElement = null;
            this.currentWindow = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void processTimer(String timerId, TimeDomain timeDomain, WindowedValue<KV<Object, Timer>> timer) {
        this.currentTimer = timer;
        this.currentTimeDomain = timeDomain;
        try {
            for (BoundedWindow this.currentWindow : timer.getWindows()) {
                this.doFnInvoker.invokeOnTimer(timerId, this.onTimerContext);
            }
        }
        finally {
            this.currentTimer = null;
            this.currentTimeDomain = null;
            this.currentWindow = null;
        }
    }

    @Override
    public void finishBundle() {
        this.doFnInvoker.invokeFinishBundle(this.finishBundleContext);
        this.stateAccessor.finalizeState();
        this.stateAccessor = null;
    }

    private <T> void outputTo(Collection<FnDataReceiver<WindowedValue<T>>> consumers, WindowedValue<T> output) {
        try {
            for (FnDataReceiver<WindowedValue<WindowedValue<T>>> fnDataReceiver : consumers) {
                fnDataReceiver.accept(output);
            }
        }
        catch (Throwable t) {
            throw UserCodeException.wrap(t);
        }
    }

    private class OnTimerContext
    extends DoFn.OnTimerContext
    implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
        private OnTimerContext() {
            super(((FnApiDoFnRunner)FnApiDoFnRunner.this).context.doFn);
        }

        @Override
        public BoundedWindow window() {
            return FnApiDoFnRunner.this.currentWindow;
        }

        @Override
        public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
            throw new UnsupportedOperationException("Cannot access paneInfo outside of @ProcessElement methods.");
        }

        @Override
        public DoFn.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
            throw new UnsupportedOperationException("Cannot access StartBundleContext outside of @StartBundle method.");
        }

        @Override
        public DoFn.FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) {
            throw new UnsupportedOperationException("Cannot access FinishBundleContext outside of @FinishBundle method.");
        }

        @Override
        public DoFn.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
            throw new UnsupportedOperationException("Cannot access ProcessContext outside of @ProcessElement method.");
        }

        @Override
        public InputT element(DoFn<InputT, OutputT> doFn) {
            throw new UnsupportedOperationException("Element parameters are not supported.");
        }

        @Override
        public Object schemaElement(DoFn<InputT, OutputT> doFn) {
            throw new UnsupportedOperationException("Element parameters are not supported.");
        }

        @Override
        public Instant timestamp(DoFn<InputT, OutputT> doFn) {
            return this.timestamp();
        }

        @Override
        public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
            return this.timeDomain();
        }

        @Override
        public DoFn.OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
            return DoFnOutputReceivers.windowedReceiver(this, null);
        }

        @Override
        public DoFn.OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
            return DoFnOutputReceivers.rowReceiver(this, null, ((FnApiDoFnRunner)FnApiDoFnRunner.this).context.mainOutputSchemaCoder);
        }

        @Override
        public DoFn.MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
            return DoFnOutputReceivers.windowedMultiReceiver(this);
        }

        @Override
        public DoFn.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
            return this;
        }

        @Override
        public RestrictionTracker<?, ?> restrictionTracker() {
            throw new UnsupportedOperationException("RestrictionTracker parameters are not supported.");
        }

        @Override
        public State state(String stateId) {
            StateSpec spec;
            DoFnSignature.StateDeclaration stateDeclaration = ((FnApiDoFnRunner)FnApiDoFnRunner.this).context.doFnSignature.stateDeclarations().get(stateId);
            Preconditions.checkNotNull(stateDeclaration, "No state declaration found for %s", (Object)stateId);
            try {
                spec = (StateSpec)stateDeclaration.field().get(((FnApiDoFnRunner)FnApiDoFnRunner.this).context.doFn);
            }
            catch (IllegalAccessException e) {
                throw new RuntimeException(e);
            }
            return spec.bind(stateId, FnApiDoFnRunner.this.stateAccessor);
        }

        @Override
        public org.apache.beam.sdk.state.Timer timer(String timerId) {
            Preconditions.checkState(FnApiDoFnRunner.this.currentTimer.getValue() instanceof KV, "Accessing timer in unkeyed context. Current timer is not a KV: %s.", (Object)FnApiDoFnRunner.this.currentTimer);
            return new FnApiTimer(timerId, FnApiDoFnRunner.this.currentTimer);
        }

        @Override
        public PipelineOptions getPipelineOptions() {
            return ((FnApiDoFnRunner)FnApiDoFnRunner.this).context.pipelineOptions;
        }

        @Override
        public PipelineOptions pipelineOptions() {
            return ((FnApiDoFnRunner)FnApiDoFnRunner.this).context.pipelineOptions;
        }

        @Override
        public void output(OutputT output) {
            FnApiDoFnRunner.this.outputTo(FnApiDoFnRunner.this.mainOutputConsumers, WindowedValue.of(output, FnApiDoFnRunner.this.currentTimer.getTimestamp(), FnApiDoFnRunner.this.currentWindow, PaneInfo.NO_FIRING));
        }

        @Override
        public void outputWithTimestamp(OutputT output, Instant timestamp) {
            Preconditions.checkArgument(!FnApiDoFnRunner.this.currentTimer.getTimestamp().isAfter(timestamp), "Output time %s can not be before timer timestamp %s.", (Object)timestamp, (Object)FnApiDoFnRunner.this.currentTimer.getTimestamp());
            FnApiDoFnRunner.this.outputTo(FnApiDoFnRunner.this.mainOutputConsumers, WindowedValue.of(output, timestamp, FnApiDoFnRunner.this.currentWindow, PaneInfo.NO_FIRING));
        }

        @Override
        public <T> void output(TupleTag<T> tag, T output) {
            List<FnDataReceiver<WindowedValue<?>>> consumers = ((FnApiDoFnRunner)FnApiDoFnRunner.this).context.localNameToConsumer.get(tag.getId());
            if (consumers == null) {
                throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
            }
            FnApiDoFnRunner.this.outputTo(consumers, WindowedValue.of(output, FnApiDoFnRunner.this.currentTimer.getTimestamp(), FnApiDoFnRunner.this.currentWindow, PaneInfo.NO_FIRING));
        }

        @Override
        public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
            Preconditions.checkArgument(!FnApiDoFnRunner.this.currentTimer.getTimestamp().isAfter(timestamp), "Output time %s can not be before timer timestamp %s.", (Object)timestamp, (Object)FnApiDoFnRunner.this.currentTimer.getTimestamp());
            List<FnDataReceiver<WindowedValue<?>>> consumers = ((FnApiDoFnRunner)FnApiDoFnRunner.this).context.localNameToConsumer.get(tag.getId());
            if (consumers == null) {
                throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
            }
            FnApiDoFnRunner.this.outputTo(consumers, WindowedValue.of(output, timestamp, FnApiDoFnRunner.this.currentWindow, PaneInfo.NO_FIRING));
        }

        @Override
        public TimeDomain timeDomain() {
            return FnApiDoFnRunner.this.currentTimeDomain;
        }

        @Override
        public Instant timestamp() {
            return FnApiDoFnRunner.this.currentTimer.getTimestamp();
        }
    }

    private class ProcessBundleContext
    extends DoFn.ProcessContext
    implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
        private ProcessBundleContext() {
            super(((FnApiDoFnRunner)FnApiDoFnRunner.this).context.doFn);
        }

        @Override
        public BoundedWindow window() {
            return FnApiDoFnRunner.this.currentWindow;
        }

        @Override
        public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
            return this.pane();
        }

        @Override
        public DoFn.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
            throw new UnsupportedOperationException("Cannot access StartBundleContext outside of @StartBundle method.");
        }

        @Override
        public DoFn.FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) {
            throw new UnsupportedOperationException("Cannot access FinishBundleContext outside of @FinishBundle method.");
        }

        @Override
        public DoFn.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
            return this;
        }

        @Override
        public InputT element(DoFn<InputT, OutputT> doFn) {
            return this.element();
        }

        @Override
        public Object schemaElement(DoFn<InputT, OutputT> doFn) {
            Row row = ((FnApiDoFnRunner)FnApiDoFnRunner.this).context.schemaCoder.getToRowFunction().apply(this.element());
            return FnApiDoFnRunner.this.doFnSchemaInformation.getElementParameterSchema().getFromRowFunction().apply(row);
        }

        @Override
        public Instant timestamp(DoFn<InputT, OutputT> doFn) {
            return this.timestamp();
        }

        @Override
        public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
            throw new UnsupportedOperationException("Cannot access time domain outside of @ProcessTimer method.");
        }

        @Override
        public DoFn.OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
            return DoFnOutputReceivers.windowedReceiver(this, null);
        }

        @Override
        public DoFn.OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
            return DoFnOutputReceivers.rowReceiver(this, null, ((FnApiDoFnRunner)FnApiDoFnRunner.this).context.mainOutputSchemaCoder);
        }

        @Override
        public DoFn.MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
            return DoFnOutputReceivers.windowedMultiReceiver(this, ((FnApiDoFnRunner)FnApiDoFnRunner.this).context.outputCoders);
        }

        @Override
        public DoFn.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
            throw new UnsupportedOperationException("Cannot access OnTimerContext outside of @OnTimer methods.");
        }

        @Override
        public RestrictionTracker<?, ?> restrictionTracker() {
            throw new UnsupportedOperationException("RestrictionTracker parameters are not supported.");
        }

        @Override
        public State state(String stateId) {
            StateSpec spec;
            DoFnSignature.StateDeclaration stateDeclaration = ((FnApiDoFnRunner)FnApiDoFnRunner.this).context.doFnSignature.stateDeclarations().get(stateId);
            Preconditions.checkNotNull(stateDeclaration, "No state declaration found for %s", (Object)stateId);
            try {
                spec = (StateSpec)stateDeclaration.field().get(((FnApiDoFnRunner)FnApiDoFnRunner.this).context.doFn);
            }
            catch (IllegalAccessException e) {
                throw new RuntimeException(e);
            }
            return spec.bind(stateId, FnApiDoFnRunner.this.stateAccessor);
        }

        @Override
        public org.apache.beam.sdk.state.Timer timer(String timerId) {
            Preconditions.checkState(FnApiDoFnRunner.this.currentElement.getValue() instanceof KV, "Accessing timer in unkeyed context. Current element is not a KV: %s.", FnApiDoFnRunner.this.currentElement.getValue());
            return new FnApiTimer(timerId, FnApiDoFnRunner.this.currentElement);
        }

        @Override
        public PipelineOptions getPipelineOptions() {
            return ((FnApiDoFnRunner)FnApiDoFnRunner.this).context.pipelineOptions;
        }

        @Override
        public PipelineOptions pipelineOptions() {
            return ((FnApiDoFnRunner)FnApiDoFnRunner.this).context.pipelineOptions;
        }

        @Override
        public void output(OutputT output) {
            FnApiDoFnRunner.this.outputTo(FnApiDoFnRunner.this.mainOutputConsumers, WindowedValue.of(output, FnApiDoFnRunner.this.currentElement.getTimestamp(), FnApiDoFnRunner.this.currentWindow, FnApiDoFnRunner.this.currentElement.getPane()));
        }

        @Override
        public void outputWithTimestamp(OutputT output, Instant timestamp) {
            FnApiDoFnRunner.this.outputTo(FnApiDoFnRunner.this.mainOutputConsumers, WindowedValue.of(output, timestamp, FnApiDoFnRunner.this.currentWindow, FnApiDoFnRunner.this.currentElement.getPane()));
        }

        @Override
        public <T> void output(TupleTag<T> tag, T output) {
            List<FnDataReceiver<WindowedValue<?>>> consumers = ((FnApiDoFnRunner)FnApiDoFnRunner.this).context.localNameToConsumer.get(tag.getId());
            if (consumers == null) {
                throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
            }
            FnApiDoFnRunner.this.outputTo(consumers, WindowedValue.of(output, FnApiDoFnRunner.this.currentElement.getTimestamp(), FnApiDoFnRunner.this.currentWindow, FnApiDoFnRunner.this.currentElement.getPane()));
        }

        @Override
        public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
            List<FnDataReceiver<WindowedValue<?>>> consumers = ((FnApiDoFnRunner)FnApiDoFnRunner.this).context.localNameToConsumer.get(tag.getId());
            if (consumers == null) {
                throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
            }
            FnApiDoFnRunner.this.outputTo(consumers, WindowedValue.of(output, timestamp, FnApiDoFnRunner.this.currentWindow, FnApiDoFnRunner.this.currentElement.getPane()));
        }

        @Override
        public InputT element() {
            return FnApiDoFnRunner.this.currentElement.getValue();
        }

        @Override
        public <T> T sideInput(PCollectionView<T> view) {
            return FnApiDoFnRunner.this.stateAccessor.get(view, FnApiDoFnRunner.this.currentWindow);
        }

        @Override
        public Instant timestamp() {
            return FnApiDoFnRunner.this.currentElement.getTimestamp();
        }

        @Override
        public PaneInfo pane() {
            return FnApiDoFnRunner.this.currentElement.getPane();
        }

        @Override
        public void updateWatermark(Instant watermark) {
            throw new UnsupportedOperationException("TODO: Add support for SplittableDoFn");
        }
    }

    private class FnApiTimer
    implements org.apache.beam.sdk.state.Timer {
        private final String timerId;
        private final TimeDomain timeDomain;
        private final Instant currentTimestamp;
        private final Duration allowedLateness;
        private final WindowedValue<?> currentElementOrTimer;
        private Duration period = Duration.ZERO;
        private Duration offset = Duration.ZERO;

        FnApiTimer(String timerId, WindowedValue<KV<?, ?>> currentElementOrTimer) {
            this.timerId = timerId;
            this.currentElementOrTimer = currentElementOrTimer;
            DoFnSignature.TimerDeclaration timerDeclaration = ((FnApiDoFnRunner)FnApiDoFnRunner.this).context.doFnSignature.timerDeclarations().get(timerId);
            this.timeDomain = DoFnSignatures.getTimerSpecOrThrow(timerDeclaration, ((FnApiDoFnRunner)FnApiDoFnRunner.this).context.doFn).getTimeDomain();
            switch (this.timeDomain) {
                case EVENT_TIME: {
                    this.currentTimestamp = currentElementOrTimer.getTimestamp();
                    break;
                }
                case PROCESSING_TIME: {
                    this.currentTimestamp = new Instant(DateTimeUtils.currentTimeMillis());
                    break;
                }
                case SYNCHRONIZED_PROCESSING_TIME: {
                    this.currentTimestamp = new Instant(DateTimeUtils.currentTimeMillis());
                    break;
                }
                default: {
                    throw new IllegalArgumentException(String.format("Unknown time domain %s", new Object[]{this.timeDomain}));
                }
            }
            try {
                this.allowedLateness = ((FnApiDoFnRunner)FnApiDoFnRunner.this).context.rehydratedComponents.getPCollection(((FnApiDoFnRunner)FnApiDoFnRunner.this).context.pTransform.getInputsOrThrow(timerId)).getWindowingStrategy().getAllowedLateness();
            }
            catch (IOException e) {
                throw new IllegalArgumentException(String.format("Unable to get allowed lateness for timer %s", timerId));
            }
        }

        @Override
        public void set(Instant absoluteTime) {
            if (!TimeDomain.EVENT_TIME.equals((Object)this.timeDomain)) {
                throw new IllegalArgumentException("Can only set relative timers in processing time domain. Use #setRelative()");
            }
            if (TimeDomain.EVENT_TIME.equals((Object)this.timeDomain)) {
                Instant windowExpiry = LateDataUtils.garbageCollectionTime(FnApiDoFnRunner.this.currentWindow, this.allowedLateness);
                Preconditions.checkArgument(!absoluteTime.isAfter(windowExpiry), "Attempted to set event time timer for %s but that is after the expiration of window %s", (Object)absoluteTime, (Object)windowExpiry);
            }
            this.output(absoluteTime);
        }

        @Override
        public void setRelative() {
            long millisSinceStart;
            Instant target = this.period.equals(Duration.ZERO) ? this.currentTimestamp.plus(this.offset) : ((millisSinceStart = this.currentTimestamp.plus(this.offset).getMillis() % this.period.getMillis()) == 0L ? this.currentTimestamp : this.currentTimestamp.plus(this.period).minus(millisSinceStart));
            target = this.minTargetAndGcTime(target);
            this.output(target);
        }

        @Override
        public org.apache.beam.sdk.state.Timer offset(Duration offset) {
            this.offset = offset;
            return this;
        }

        @Override
        public org.apache.beam.sdk.state.Timer align(Duration period) {
            this.period = period;
            return this;
        }

        private Instant minTargetAndGcTime(Instant target) {
            Instant windowExpiry;
            if (TimeDomain.EVENT_TIME.equals((Object)this.timeDomain) && target.isAfter(windowExpiry = LateDataUtils.garbageCollectionTime(FnApiDoFnRunner.this.currentWindow, this.allowedLateness))) {
                return windowExpiry;
            }
            return target;
        }

        private void output(Instant scheduledTime) {
            Object key = ((KV)this.currentElementOrTimer.getValue()).getKey();
            List<FnDataReceiver<WindowedValue<?>>> consumers = ((FnApiDoFnRunner)FnApiDoFnRunner.this).context.localNameToConsumer.get(this.timerId);
            FnApiDoFnRunner.this.outputTo(consumers, this.currentElementOrTimer.withValue(KV.of(key, Timer.of(scheduledTime))));
        }
    }

    static class Factory<InputT, OutputT>
    extends DoFnPTransformRunnerFactory<InputT, InputT, OutputT, FnApiDoFnRunner<InputT, OutputT>> {
        Factory() {
        }

        @Override
        public FnApiDoFnRunner<InputT, OutputT> createRunner(DoFnPTransformRunnerFactory.Context<InputT, OutputT> context) {
            return new FnApiDoFnRunner<InputT, OutputT>(context);
        }
    }

    @AutoService(value=PTransformRunnerFactory.Registrar.class)
    public static class Registrar
    implements PTransformRunnerFactory.Registrar {
        @Override
        public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
            return ImmutableMap.of(PTransformTranslation.PAR_DO_TRANSFORM_URN, new Factory());
        }
    }
}

