/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io;

import java.io.IOException;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.CompressedSource;
import org.apache.beam.sdk.io.FileBasedSource;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

@Experimental(value=Experimental.Kind.SOURCE_SINK)
public class ReadAllViaFileBasedSource<T>
extends PTransform<PCollection<FileIO.ReadableFile>, PCollection<T>> {
    private final long desiredBundleSizeBytes;
    private final SerializableFunction<String, ? extends FileBasedSource<T>> createSource;
    private final Coder<T> coder;

    public ReadAllViaFileBasedSource(long desiredBundleSizeBytes, SerializableFunction<String, ? extends FileBasedSource<T>> createSource, Coder<T> coder) {
        this.desiredBundleSizeBytes = desiredBundleSizeBytes;
        this.createSource = createSource;
        this.coder = coder;
    }

    @Override
    public PCollection<T> expand(PCollection<FileIO.ReadableFile> input) {
        return ((PCollection)((PCollection)((PCollection)((Object)input.apply("Split into ranges", ParDo.of(new SplitIntoRangesFn(this.desiredBundleSizeBytes))))).apply("Reshuffle", Reshuffle.viaRandomKey())).apply("Read ranges", ParDo.of(new ReadFileRangesFn(this.createSource)))).setCoder(this.coder);
    }

    private static class ReadFileRangesFn<T>
    extends DoFn<KV<FileIO.ReadableFile, OffsetRange>, T> {
        private final SerializableFunction<String, ? extends FileBasedSource<T>> createSource;

        private ReadFileRangesFn(SerializableFunction<String, ? extends FileBasedSource<T>> createSource) {
            this.createSource = createSource;
        }

        @DoFn.ProcessElement
        public void process(DoFn.ProcessContext c) throws IOException {
            FileIO.ReadableFile file = (FileIO.ReadableFile)((KV)c.element()).getKey();
            OffsetRange range = (OffsetRange)((KV)c.element()).getValue();
            CompressedSource<T> source = CompressedSource.from(this.createSource.apply(file.getMetadata().resourceId().toString())).withCompression(file.getCompression());
            try (BoundedSource.BoundedReader reader = ((FileBasedSource)source).createForSubrangeOfFile(file.getMetadata(), range.getFrom(), range.getTo()).createReader(c.getPipelineOptions());){
                boolean more = reader.start();
                while (more) {
                    c.output(reader.getCurrent());
                    more = reader.advance();
                }
            }
        }
    }

    private static class SplitIntoRangesFn
    extends DoFn<FileIO.ReadableFile, KV<FileIO.ReadableFile, OffsetRange>> {
        private final long desiredBundleSizeBytes;

        private SplitIntoRangesFn(long desiredBundleSizeBytes) {
            this.desiredBundleSizeBytes = desiredBundleSizeBytes;
        }

        @DoFn.ProcessElement
        public void process(DoFn.ProcessContext c) {
            MatchResult.Metadata metadata = ((FileIO.ReadableFile)c.element()).getMetadata();
            if (!metadata.isReadSeekEfficient()) {
                c.output(KV.of((FileIO.ReadableFile)c.element(), new OffsetRange(0L, metadata.sizeBytes())));
                return;
            }
            for (OffsetRange range : new OffsetRange(0L, metadata.sizeBytes()).split(this.desiredBundleSizeBytes, 0L)) {
                c.output(KV.of((FileIO.ReadableFile)c.element(), range));
            }
        }
    }
}

