/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.direct.CommittedBundle;
import org.apache.beam.runners.direct.CommittedResult;
import org.apache.beam.runners.direct.EvaluationContext;
import org.apache.beam.runners.direct.PCollectionViewWriter;
import org.apache.beam.runners.direct.StepTransformResult;
import org.apache.beam.runners.direct.TransformEvaluator;
import org.apache.beam.runners.direct.TransformEvaluatorFactory;
import org.apache.beam.runners.direct.TransformResult;
import org.apache.beam.runners.direct.ViewOverrideFactory;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;

class ViewEvaluatorFactory
implements TransformEvaluatorFactory {
    private final EvaluationContext context;

    ViewEvaluatorFactory(EvaluationContext context) {
        this.context = context;
    }

    public <T> TransformEvaluator<T> forApplication(AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) {
        TransformEvaluator evaluator = this.createEvaluator(application);
        return evaluator;
    }

    @Override
    public void cleanup() throws Exception {
    }

    private <InT, OuT> TransformEvaluator<Iterable<InT>> createEvaluator(final AppliedPTransform<PCollection<Iterable<InT>>, PCollection<Iterable<InT>>, ViewOverrideFactory.WriteView<InT, OuT>> application) {
        PCollection input = (PCollection)Iterables.getOnlyElement(application.getInputs().values());
        final PCollectionViewWriter writer = this.context.createPCollectionViewWriter(input, ((ViewOverrideFactory.WriteView)application.getTransform()).getView());
        return new TransformEvaluator<Iterable<InT>>(){
            private final List<WindowedValue<InT>> elements = new ArrayList();

            @Override
            public void processElement(WindowedValue<Iterable<InT>> element) {
                for (Object input : (Iterable)element.getValue()) {
                    this.elements.add(element.withValue(input));
                }
            }

            @Override
            public TransformResult<Iterable<InT>> finishBundle() {
                writer.add(this.elements);
                StepTransformResult.Builder resultBuilder = StepTransformResult.withoutHold(application);
                if (!this.elements.isEmpty()) {
                    resultBuilder = resultBuilder.withAdditionalOutput(CommittedResult.OutputType.PCOLLECTION_VIEW);
                }
                return resultBuilder.build();
            }
        };
    }
}

