/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.beam.fn.harness.DoFnPTransformRunnerFactory;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.state.FnApiStateAccessor;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.SplittableProcessElementInvoker;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.util.Timestamps;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class SplittableProcessElementsRunner<InputT, RestrictionT, OutputT>
implements DoFnPTransformRunnerFactory.DoFnPTransformRunner<KV<InputT, RestrictionT>> {
    private final DoFnPTransformRunnerFactory.Context<InputT, OutputT> context;
    private final String mainInputId;
    private final Coder<WindowedValue<KV<InputT, RestrictionT>>> inputCoder;
    private final Collection<FnDataReceiver<WindowedValue<OutputT>>> mainOutputConsumers;
    private final DoFnInvoker<InputT, OutputT> doFnInvoker;
    private final ScheduledExecutorService executor;
    private FnApiStateAccessor stateAccessor;
    private final DoFn.StartBundleContext startBundleContext;
    private final DoFn.FinishBundleContext finishBundleContext;

    SplittableProcessElementsRunner(final DoFnPTransformRunnerFactory.Context<InputT, OutputT> context, Coder<WindowedValue<KV<InputT, RestrictionT>>> inputCoder, Collection<FnDataReceiver<WindowedValue<OutputT>>> mainOutputConsumers, String mainInputId) {
        this.context = context;
        this.mainInputId = mainInputId;
        this.inputCoder = inputCoder;
        this.mainOutputConsumers = mainOutputConsumers;
        this.doFnInvoker = DoFnInvokers.invokerFor(context.doFn);
        this.doFnInvoker.invokeSetup();
        this.executor = Executors.newSingleThreadScheduledExecutor();
        DoFn doFn = context.doFn;
        Objects.requireNonNull(doFn);
        this.startBundleContext = new DoFn.StartBundleContext(doFn){

            @Override
            public PipelineOptions getPipelineOptions() {
                return context.pipelineOptions;
            }
        };
        DoFn doFn2 = context.doFn;
        Objects.requireNonNull(doFn2);
        this.finishBundleContext = new DoFn.FinishBundleContext(doFn2){

            @Override
            public PipelineOptions getPipelineOptions() {
                return context.pipelineOptions;
            }

            @Override
            public void output(OutputT output, Instant timestamp, BoundedWindow window) {
                throw new UnsupportedOperationException();
            }

            @Override
            public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
                throw new UnsupportedOperationException();
            }
        };
    }

    @Override
    public void startBundle() {
        this.doFnInvoker.invokeStartBundle(this.startBundleContext);
    }

    @Override
    public void processElement(WindowedValue<KV<InputT, RestrictionT>> elem) {
        this.processElementTyped(elem);
    }

    private <PositionT, TrackerT extends RestrictionTracker<RestrictionT, PositionT>> void processElementTyped(WindowedValue<KV<InputT, RestrictionT>> elem) {
        Preconditions.checkArgument(elem.getWindows().size() == 1, "SPLITTABLE_PROCESS_ELEMENTS expects its input to be in 1 window, but got %s windows", elem.getWindows().size());
        WindowedValue<InputT> element = elem.withValue(elem.getValue().getKey());
        BoundedWindow window = elem.getWindows().iterator().next();
        this.stateAccessor = new FnApiStateAccessor(this.context.pipelineOptions, this.context.ptransformId, this.context.processBundleInstructionId, this.context.tagToSideInputSpecMap, this.context.beamFnStateClient, this.context.keyCoder, this.context.windowCoder, () -> elem, () -> window);
        Object tracker = this.doFnInvoker.invokeNewTracker(elem.getValue().getValue());
        OutputAndTimeBoundedSplittableProcessElementInvoker processElementInvoker = new OutputAndTimeBoundedSplittableProcessElementInvoker(this.context.doFn, this.context.pipelineOptions, new OutputWindowedValue<OutputT>(){

            @Override
            public void outputWindowedValue(OutputT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
                SplittableProcessElementsRunner.this.outputTo(SplittableProcessElementsRunner.this.mainOutputConsumers, WindowedValue.of(output, timestamp, windows, pane));
            }

            @Override
            public <AdditionalOutputT> void outputWindowedValue(TupleTag<AdditionalOutputT> tag, AdditionalOutputT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
                List<FnDataReceiver<WindowedValue<?>>> consumers = ((SplittableProcessElementsRunner)SplittableProcessElementsRunner.this).context.localNameToConsumer.get(tag.getId());
                if (consumers == null) {
                    throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
                }
                SplittableProcessElementsRunner.this.outputTo(consumers, WindowedValue.of(output, timestamp, windows, pane));
            }
        }, this.stateAccessor, this.executor, 10000, Duration.standardSeconds(10L));
        SplittableProcessElementInvoker.Result result = processElementInvoker.invokeProcessElement(this.doFnInvoker, element, tracker);
        this.stateAccessor = null;
        if (result.getContinuation().shouldResume()) {
            WindowedValue<KV<InputT, RestrictionT>> primary = element.withValue(KV.of(element.getValue(), ((RestrictionTracker)tracker).currentRestriction()));
            WindowedValue residual = element.withValue(KV.of(element.getValue(), result.getResidualRestriction()));
            ByteString.Output primaryBytes = ByteString.newOutput();
            ByteString.Output residualBytes = ByteString.newOutput();
            try {
                this.inputCoder.encode(primary, primaryBytes);
                this.inputCoder.encode(residual, residualBytes);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            BeamFnApi.BundleApplication primaryApplication = BeamFnApi.BundleApplication.newBuilder().setPtransformId(this.context.ptransformId).setInputId(this.mainInputId).setElement(primaryBytes.toByteString()).build();
            BeamFnApi.BundleApplication residualApplication = BeamFnApi.BundleApplication.newBuilder().setPtransformId(this.context.ptransformId).setInputId(this.mainInputId).setElement(residualBytes.toByteString()).build();
            this.context.splitListener.split(ImmutableList.of(primaryApplication), ImmutableList.of(BeamFnApi.DelayedBundleApplication.newBuilder().setApplication(residualApplication).setRequestedExecutionTime(Timestamps.fromMillis(System.currentTimeMillis() + result.getContinuation().resumeDelay().getMillis())).build()));
        }
    }

    @Override
    public void processTimer(String timerId, TimeDomain timeDomain, WindowedValue<KV<Object, Timer>> input) {
        throw new UnsupportedOperationException("Timers are unsupported in a SplittableDoFn.");
    }

    @Override
    public void finishBundle() {
        this.doFnInvoker.invokeFinishBundle(this.finishBundleContext);
    }

    private <T> void outputTo(Collection<FnDataReceiver<WindowedValue<T>>> consumers, WindowedValue<T> output) {
        try {
            for (FnDataReceiver<WindowedValue<WindowedValue<T>>> fnDataReceiver : consumers) {
                fnDataReceiver.accept(output);
            }
        }
        catch (Throwable t) {
            throw UserCodeException.wrap(t);
        }
    }

    static class Factory<InputT, RestrictionT, OutputT>
    extends DoFnPTransformRunnerFactory<KV<InputT, RestrictionT>, InputT, OutputT, SplittableProcessElementsRunner<InputT, RestrictionT, OutputT>> {
        Factory() {
        }

        @Override
        SplittableProcessElementsRunner<InputT, RestrictionT, OutputT> createRunner(DoFnPTransformRunnerFactory.Context<InputT, OutputT> context) {
            WindowedValue.FullWindowedValueCoder windowedCoder = WindowedValue.FullWindowedValueCoder.of(context.inputCoder, context.windowCoder);
            return new SplittableProcessElementsRunner(context, windowedCoder, context.localNameToConsumer.get(context.mainOutputTag.getId()), Iterables.getOnlyElement(context.pTransform.getInputsMap().keySet()));
        }
    }

    @AutoService(value=PTransformRunnerFactory.Registrar.class)
    public static class Registrar
    implements PTransformRunnerFactory.Registrar {
        @Override
        public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
            return ImmutableMap.of(PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN, new Factory());
        }
    }
}

