package org.apache.beam.runners.direct;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.ImmutableList;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.Iterables;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.util.concurrent.MoreExecutors;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.util.concurrent.SettableFuture;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ReadTranslation;
import org.apache.beam.runners.direct.StepTransformResult;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.class */
public final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
    private static final long REQUIRED_DYNAMIC_SPLIT_ORIGINAL_SIZE = 0;
    private final EvaluationContext evaluationContext;
    private final PipelineOptions options;

    @VisibleForTesting
    final ExecutorService executor;
    private final long minimumDynamicSplitSize;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/BoundedReadEvaluatorFactory$BoundedReadEvaluator.class */
    public static class BoundedReadEvaluator<OutputT> implements TransformEvaluator<BoundedSourceShard<OutputT>> {
        private final PCollection<OutputT> outputPCollection;
        private final EvaluationContext evaluationContext;
        private final PipelineOptions options;
        private StepTransformResult.Builder resultBuilder;
        private final long minimumDynamicSplitSize;
        private final ExecutorService produceSplitExecutor;

        public BoundedReadEvaluator(AppliedPTransform<?, PCollection<OutputT>, ?> appliedPTransform, EvaluationContext evaluationContext, PipelineOptions pipelineOptions, long j, ExecutorService executorService) {
            this.evaluationContext = evaluationContext;
            this.outputPCollection = (PCollection) Iterables.getOnlyElement(appliedPTransform.getOutputs().values());
            this.resultBuilder = StepTransformResult.withoutHold(appliedPTransform);
            this.options = pipelineOptions;
            this.minimumDynamicSplitSize = j;
            this.produceSplitExecutor = executorService;
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public void processElement(WindowedValue<BoundedSourceShard<OutputT>> windowedValue) throws Exception {
            BoundedSource<OutputT> mo1330getSource = ((BoundedSourceShard) windowedValue.getValue()).mo1330getSource();
            BoundedSource.BoundedReader<OutputT> createReader = mo1330getSource.createReader(this.options);
            Throwable th = null;
            try {
                Future<BoundedSource<OutputT>> startDynamicSplitThread = startDynamicSplitThread(mo1330getSource, createReader);
                UncommittedBundle<?> createBundle = this.evaluationContext.createBundle(this.outputPCollection);
                for (boolean start = createReader.start(); start; start = createReader.advance()) {
                    createBundle.add(WindowedValue.timestampedValueInGlobalWindow(createReader.getCurrent(), createReader.getCurrentTimestamp()));
                }
                this.resultBuilder.addOutput(createBundle, new UncommittedBundle[0]);
                try {
                    BoundedSource<OutputT> boundedSource = startDynamicSplitThread.get();
                    if (boundedSource != null) {
                        this.resultBuilder.addUnprocessedElements(windowedValue.withValue(BoundedSourceShard.of(boundedSource)));
                    }
                    if (createReader != null) {
                        if (0 == 0) {
                            createReader.close();
                            return;
                        }
                        try {
                            createReader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (ExecutionException e) {
                    throw UserCodeException.wrap(e.getCause());
                }
            } catch (Throwable th3) {
                if (createReader != null) {
                    if (0 != 0) {
                        try {
                            createReader.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        createReader.close();
                    }
                }
                throw th3;
            }
        }

        private Future<BoundedSource<OutputT>> startDynamicSplitThread(BoundedSource<OutputT> boundedSource, BoundedSource.BoundedReader<OutputT> boundedReader) throws Exception {
            if (boundedSource.getEstimatedSizeBytes(this.options) > this.minimumDynamicSplitSize) {
                return this.produceSplitExecutor.submit(new GenerateSplitAtHalfwayPoint(boundedReader));
            }
            SettableFuture create = SettableFuture.create();
            create.set(null);
            return create;
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public TransformResult<BoundedSourceShard<OutputT>> finishBundle() {
            return this.resultBuilder.build();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    /* loaded from: input_file:org/apache/beam/runners/direct/BoundedReadEvaluatorFactory$BoundedSourceShard.class */
    public static abstract class BoundedSourceShard<T> implements SourceShard<T> {
        static <T> BoundedSourceShard<T> of(BoundedSource<T> boundedSource) {
            return new AutoValue_BoundedReadEvaluatorFactory_BoundedSourceShard(boundedSource);
        }

        @Override // org.apache.beam.runners.direct.SourceShard
        /* renamed from: getSource */
        public abstract BoundedSource<T> mo1330getSource();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/BoundedReadEvaluatorFactory$GenerateSplitAtHalfwayPoint.class */
    public static class GenerateSplitAtHalfwayPoint<T> implements Callable<BoundedSource<T>> {
        private final BoundedSource.BoundedReader<T> reader;

        private GenerateSplitAtHalfwayPoint(BoundedSource.BoundedReader<T> boundedReader) {
            this.reader = boundedReader;
        }

        @Override // java.util.concurrent.Callable
        public BoundedSource<T> call() throws Exception {
            Double fractionConsumed = this.reader.getFractionConsumed();
            if (fractionConsumed == null || fractionConsumed.doubleValue() == 1.0d) {
                return null;
            }
            return this.reader.splitAtFraction(0.5d + (fractionConsumed.doubleValue() / 2.0d));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/direct/BoundedReadEvaluatorFactory$InputProvider.class */
    public static class InputProvider<T> implements RootInputProvider<T, BoundedSourceShard<T>, PBegin> {
        private final EvaluationContext evaluationContext;
        private final PipelineOptions options;

        /* JADX INFO: Access modifiers changed from: package-private */
        public InputProvider(EvaluationContext evaluationContext, PipelineOptions pipelineOptions) {
            this.evaluationContext = evaluationContext;
            this.options = pipelineOptions;
        }

        @Override // org.apache.beam.runners.direct.RootInputProvider
        public Collection<CommittedBundle<BoundedSourceShard<T>>> getInitialInputs(AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> appliedPTransform, int i) throws Exception {
            BoundedSource boundedSourceFromTransform = ReadTranslation.boundedSourceFromTransform(appliedPTransform);
            List split = boundedSourceFromTransform.split(boundedSourceFromTransform.getEstimatedSizeBytes(this.options) / i, this.options);
            ImmutableList.Builder builder = ImmutableList.builder();
            Iterator it = split.iterator();
            while (it.hasNext()) {
                builder.add((ImmutableList.Builder) this.evaluationContext.createRootBundle().add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of((BoundedSource) it.next()))).commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
            }
            return builder.build();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BoundedReadEvaluatorFactory(EvaluationContext evaluationContext, PipelineOptions pipelineOptions) {
        this(evaluationContext, pipelineOptions, REQUIRED_DYNAMIC_SPLIT_ORIGINAL_SIZE);
    }

    @VisibleForTesting
    BoundedReadEvaluatorFactory(EvaluationContext evaluationContext, PipelineOptions pipelineOptions, long j) {
        this.executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setThreadFactory(MoreExecutors.platformThreadFactory()).setDaemon(true).setNameFormat("direct-dynamic-split-requester").build());
        this.evaluationContext = evaluationContext;
        this.options = pipelineOptions;
        this.minimumDynamicSplitSize = j;
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    @Nullable
    public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> appliedPTransform, CommittedBundle<?> committedBundle) throws IOException {
        return (TransformEvaluator<InputT>) createEvaluator(appliedPTransform);
    }

    private <OutputT> TransformEvaluator<?> createEvaluator(AppliedPTransform<?, PCollection<OutputT>, ?> appliedPTransform) {
        return new BoundedReadEvaluator(appliedPTransform, this.evaluationContext, this.options, this.minimumDynamicSplitSize, this.executor);
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    public void cleanup() {
        this.executor.shutdown();
    }
}
