/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.text.DecimalFormat;
import java.util.Collection;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.AutoValue_FileIO_Match;
import org.apache.beam.sdk.io.AutoValue_FileIO_MatchAll;
import org.apache.beam.sdk.io.AutoValue_FileIO_MatchConfiguration;
import org.apache.beam.sdk.io.AutoValue_FileIO_ReadMatches;
import org.apache.beam.sdk.io.AutoValue_FileIO_Write;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.io.WriteFilesResult;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Contextful;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Requirements;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.Watch;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.StreamUtils;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Objects;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileIO {
    private static final Logger LOG = LoggerFactory.getLogger(FileIO.class);

    public static Match match() {
        return new AutoValue_FileIO_Match.Builder().setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)).build();
    }

    public static MatchAll matchAll() {
        return new AutoValue_FileIO_MatchAll.Builder().setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD)).build();
    }

    public static ReadMatches readMatches() {
        return new AutoValue_FileIO_ReadMatches.Builder().setCompression(Compression.AUTO).setDirectoryTreatment(ReadMatches.DirectoryTreatment.SKIP).build();
    }

    public static <InputT> Write<Void, InputT> write() {
        return new AutoValue_FileIO_Write.Builder().setDynamic(false).setCompression(Compression.UNCOMPRESSED).setIgnoreWindowing(false).setNoSpilling(false).build();
    }

    public static <DestT, InputT> Write<DestT, InputT> writeDynamic() {
        return new AutoValue_FileIO_Write.Builder().setDynamic(true).setCompression(Compression.UNCOMPRESSED).setIgnoreWindowing(false).setNoSpilling(false).build();
    }

    @AutoValue
    @Experimental(value=Experimental.Kind.SOURCE_SINK)
    public static abstract class Write<DestinationT, UserT>
    extends PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>> {
        public static FileNaming defaultNaming(String prefix, String suffix) {
            return Write.defaultNaming(ValueProvider.StaticValueProvider.of(prefix), ValueProvider.StaticValueProvider.of(suffix));
        }

        public static FileNaming defaultNaming(ValueProvider<String> prefix, ValueProvider<String> suffix) {
            return (window, pane, numShards, shardIndex, compression) -> {
                boolean isOnlyFiring;
                Preconditions.checkArgument(window != null, "window can not be null");
                Preconditions.checkArgument(pane != null, "pane can not be null");
                Preconditions.checkArgument(compression != null, "compression can not be null");
                StringBuilder res = new StringBuilder((String)prefix.get());
                if (window != GlobalWindow.INSTANCE) {
                    if (res.length() > 0) {
                        res.append("-");
                    }
                    Preconditions.checkArgument(window instanceof IntervalWindow, "defaultNaming() supports only windows of type %s, but got window %s of type %s", (Object)IntervalWindow.class.getSimpleName(), (Object)window, (Object)window.getClass().getSimpleName());
                    IntervalWindow iw = (IntervalWindow)window;
                    res.append(iw.start().toString()).append("-").append(iw.end().toString());
                }
                boolean bl = isOnlyFiring = pane.isFirst() && pane.isLast();
                if (!isOnlyFiring) {
                    if (res.length() > 0) {
                        res.append("-");
                    }
                    res.append(pane.getIndex());
                }
                if (res.length() > 0) {
                    res.append("-");
                }
                String numShardsStr = String.valueOf(numShards);
                DecimalFormat df = new DecimalFormat("000000000000".substring(0, Math.max(5, numShardsStr.length())));
                res.append(df.format(shardIndex)).append("-of-").append(df.format(numShards));
                res.append((String)suffix.get());
                res.append(compression.getSuggestedSuffix());
                return res.toString();
            };
        }

        public static FileNaming relativeFileNaming(ValueProvider<String> baseDirectory, FileNaming innerNaming) {
            return (window, pane, numShards, shardIndex, compression) -> FileSystems.matchNewResource((String)baseDirectory.get(), true).resolve(innerNaming.getFilename(window, pane, numShards, shardIndex, compression), ResolveOptions.StandardResolveOptions.RESOLVE_FILE).toString();
        }

        abstract boolean getDynamic();

        @Nullable
        abstract Contextful<Contextful.Fn<DestinationT, Sink<?>>> getSinkFn();

        @Nullable
        abstract Contextful<Contextful.Fn<UserT, ?>> getOutputFn();

        @Nullable
        abstract Contextful<Contextful.Fn<UserT, DestinationT>> getDestinationFn();

        @Nullable
        abstract ValueProvider<String> getOutputDirectory();

        @Nullable
        abstract ValueProvider<String> getFilenamePrefix();

        @Nullable
        abstract ValueProvider<String> getFilenameSuffix();

        @Nullable
        abstract FileNaming getConstantFileNaming();

        @Nullable
        abstract Contextful<Contextful.Fn<DestinationT, FileNaming>> getFileNamingFn();

        @Nullable
        abstract DestinationT getEmptyWindowDestination();

        @Nullable
        abstract Coder<DestinationT> getDestinationCoder();

        @Nullable
        abstract ValueProvider<String> getTempDirectory();

        abstract Compression getCompression();

        @Nullable
        abstract ValueProvider<Integer> getNumShards();

        @Nullable
        abstract PTransform<PCollection<UserT>, PCollectionView<Integer>> getSharding();

        abstract boolean getIgnoreWindowing();

        abstract boolean getNoSpilling();

        abstract Builder<DestinationT, UserT> toBuilder();

        public Write<DestinationT, UserT> by(SerializableFunction<UserT, DestinationT> destinationFn) {
            Preconditions.checkArgument(destinationFn != null, "destinationFn can not be null");
            return this.by(Contextful.fn(destinationFn));
        }

        public Write<DestinationT, UserT> by(Contextful<Contextful.Fn<UserT, DestinationT>> destinationFn) {
            Preconditions.checkArgument(destinationFn != null, "destinationFn can not be null");
            return this.toBuilder().setDestinationFn(destinationFn).build();
        }

        public <OutputT> Write<DestinationT, UserT> via(Contextful<Contextful.Fn<UserT, OutputT>> outputFn, Contextful<Contextful.Fn<DestinationT, Sink<OutputT>>> sinkFn) {
            Preconditions.checkArgument(sinkFn != null, "sinkFn can not be null");
            Preconditions.checkArgument(outputFn != null, "outputFn can not be null");
            return this.toBuilder().setSinkFn(sinkFn).setOutputFn(outputFn).build();
        }

        public <OutputT> Write<DestinationT, UserT> via(Contextful<Contextful.Fn<UserT, OutputT>> outputFn, Sink<OutputT> sink) {
            Preconditions.checkArgument(sink != null, "sink can not be null");
            Preconditions.checkArgument(outputFn != null, "outputFn can not be null");
            return this.via(outputFn, Contextful.fn(SerializableFunctions.clonesOf(sink)));
        }

        public Write<DestinationT, UserT> via(Contextful<Contextful.Fn<DestinationT, Sink<UserT>>> sinkFn) {
            Preconditions.checkArgument(sinkFn != null, "sinkFn can not be null");
            return this.toBuilder().setSinkFn(sinkFn).setOutputFn(Contextful.fn(SerializableFunctions.identity())).build();
        }

        public Write<DestinationT, UserT> via(Sink<UserT> sink) {
            Preconditions.checkArgument(sink != null, "sink can not be null");
            return this.via(Contextful.fn(SerializableFunctions.clonesOf(sink)));
        }

        public Write<DestinationT, UserT> to(String directory) {
            Preconditions.checkArgument(directory != null, "directory can not be null");
            return this.to(ValueProvider.StaticValueProvider.of(directory));
        }

        public Write<DestinationT, UserT> to(ValueProvider<String> directory) {
            Preconditions.checkArgument(directory != null, "directory can not be null");
            return this.toBuilder().setOutputDirectory(directory).build();
        }

        public Write<DestinationT, UserT> withPrefix(String prefix) {
            Preconditions.checkArgument(prefix != null, "prefix can not be null");
            return this.withPrefix(ValueProvider.StaticValueProvider.of(prefix));
        }

        public Write<DestinationT, UserT> withPrefix(ValueProvider<String> prefix) {
            Preconditions.checkArgument(prefix != null, "prefix can not be null");
            return this.toBuilder().setFilenamePrefix(prefix).build();
        }

        public Write<DestinationT, UserT> withSuffix(String suffix) {
            Preconditions.checkArgument(suffix != null, "suffix can not be null");
            return this.withSuffix(ValueProvider.StaticValueProvider.of(suffix));
        }

        public Write<DestinationT, UserT> withSuffix(ValueProvider<String> suffix) {
            Preconditions.checkArgument(suffix != null, "suffix can not be null");
            return this.toBuilder().setFilenameSuffix(suffix).build();
        }

        public Write<DestinationT, UserT> withNaming(FileNaming naming) {
            Preconditions.checkArgument(naming != null, "naming can not be null");
            return this.toBuilder().setConstantFileNaming(naming).build();
        }

        public Write<DestinationT, UserT> withNaming(SerializableFunction<DestinationT, FileNaming> namingFn) {
            Preconditions.checkArgument(namingFn != null, "namingFn can not be null");
            return this.withNaming(Contextful.fn(namingFn));
        }

        public Write<DestinationT, UserT> withNaming(Contextful<Contextful.Fn<DestinationT, FileNaming>> namingFn) {
            Preconditions.checkArgument(namingFn != null, "namingFn can not be null");
            return this.toBuilder().setFileNamingFn(namingFn).build();
        }

        public Write<DestinationT, UserT> withTempDirectory(String tempDirectory) {
            Preconditions.checkArgument(tempDirectory != null, "tempDirectory can not be null");
            return this.withTempDirectory(ValueProvider.StaticValueProvider.of(tempDirectory));
        }

        public Write<DestinationT, UserT> withTempDirectory(ValueProvider<String> tempDirectory) {
            Preconditions.checkArgument(tempDirectory != null, "tempDirectory can not be null");
            return this.toBuilder().setTempDirectory(tempDirectory).build();
        }

        public Write<DestinationT, UserT> withCompression(Compression compression) {
            Preconditions.checkArgument(compression != null, "compression can not be null");
            Preconditions.checkArgument(compression != Compression.AUTO, "AUTO compression is not supported for writing");
            return this.toBuilder().setCompression(compression).build();
        }

        public Write<DestinationT, UserT> withEmptyGlobalWindowDestination(DestinationT emptyWindowDestination) {
            return this.toBuilder().setEmptyWindowDestination(emptyWindowDestination).build();
        }

        public Write<DestinationT, UserT> withDestinationCoder(Coder<DestinationT> destinationCoder) {
            Preconditions.checkArgument(destinationCoder != null, "destinationCoder can not be null");
            return this.toBuilder().setDestinationCoder(destinationCoder).build();
        }

        public Write<DestinationT, UserT> withNumShards(int numShards) {
            Preconditions.checkArgument(numShards >= 0, "numShards must be non-negative, but was: %s", numShards);
            if (numShards == 0) {
                return this.withNumShards(null);
            }
            return this.withNumShards(ValueProvider.StaticValueProvider.of(numShards));
        }

        public Write<DestinationT, UserT> withNumShards(@Nullable ValueProvider<Integer> numShards) {
            return this.toBuilder().setNumShards(numShards).build();
        }

        public Write<DestinationT, UserT> withSharding(PTransform<PCollection<UserT>, PCollectionView<Integer>> sharding) {
            Preconditions.checkArgument(sharding != null, "sharding can not be null");
            return this.toBuilder().setSharding(sharding).build();
        }

        @Deprecated
        public Write<DestinationT, UserT> withIgnoreWindowing() {
            return this.toBuilder().setIgnoreWindowing(true).build();
        }

        public Write<DestinationT, UserT> withNoSpilling() {
            return this.toBuilder().setNoSpilling(true).build();
        }

        @VisibleForTesting
        Contextful<Contextful.Fn<DestinationT, FileNaming>> resolveFileNamingFn() {
            FileNaming constantFileNaming;
            if (this.getDynamic()) {
                Preconditions.checkArgument(this.getConstantFileNaming() == null, "when using writeDynamic(), must use versions of .withNaming() that take functions from DestinationT");
                Preconditions.checkArgument(this.getFilenamePrefix() == null, ".withPrefix() requires write()");
                Preconditions.checkArgument(this.getFilenameSuffix() == null, ".withSuffix() requires write()");
                Preconditions.checkArgument(this.getFileNamingFn() != null, "when using writeDynamic(), must specify .withNaming() taking a function form DestinationT");
                return Contextful.fn((element, c) -> {
                    FileNaming naming = this.getFileNamingFn().getClosure().apply(element, c);
                    return this.getOutputDirectory() == null ? naming : Write.relativeFileNaming(this.getOutputDirectory(), naming);
                }, this.getFileNamingFn().getRequirements());
            }
            Preconditions.checkArgument(this.getFileNamingFn() == null, ".withNaming() taking a function from DestinationT requires writeDynamic()");
            if (this.getConstantFileNaming() == null) {
                constantFileNaming = Write.defaultNaming((ValueProvider<String>)MoreObjects.firstNonNull(this.getFilenamePrefix(), ValueProvider.StaticValueProvider.of("output")), (ValueProvider<String>)MoreObjects.firstNonNull(this.getFilenameSuffix(), ValueProvider.StaticValueProvider.of("")));
            } else {
                Preconditions.checkArgument(this.getFilenamePrefix() == null, ".to(FileNaming) is incompatible with .withSuffix()");
                Preconditions.checkArgument(this.getFilenameSuffix() == null, ".to(FileNaming) is incompatible with .withPrefix()");
                constantFileNaming = this.getConstantFileNaming();
            }
            if (this.getOutputDirectory() != null) {
                constantFileNaming = Write.relativeFileNaming(this.getOutputDirectory(), constantFileNaming);
            }
            return Contextful.fn(SerializableFunctions.constant(constantFileNaming));
        }

        @Override
        public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
            AutoValue_FileIO_Write.Builder<DestinationT, UserT> resolvedSpec = new AutoValue_FileIO_Write.Builder<DestinationT, UserT>();
            ((Builder)resolvedSpec).setDynamic(this.getDynamic());
            Preconditions.checkArgument(this.getSinkFn() != null, ".via() is required");
            ((Builder)resolvedSpec).setSinkFn(this.getSinkFn());
            Preconditions.checkArgument(this.getOutputFn() != null, "outputFn should have been set by .via()");
            ((Builder)resolvedSpec).setOutputFn(this.getOutputFn());
            if (this.getDynamic()) {
                Preconditions.checkArgument(this.getDestinationFn() != null, "when using writeDynamic(), .by() is required");
                ((Builder)resolvedSpec).setDestinationFn(this.getDestinationFn());
                ((Builder)resolvedSpec).setDestinationCoder(this.resolveDestinationCoder(input));
            } else {
                Preconditions.checkArgument(this.getDestinationFn() == null, ".by() requires writeDynamic()");
                Preconditions.checkArgument(this.getDestinationCoder() == null, ".withDestinationCoder() requires writeDynamic()");
                ((Builder)resolvedSpec).setDestinationFn(Contextful.fn(SerializableFunctions.constant(null)));
                ((Builder)resolvedSpec).setDestinationCoder(VoidCoder.of());
            }
            ((Builder)resolvedSpec).setFileNamingFn(this.resolveFileNamingFn());
            ((Builder)resolvedSpec).setEmptyWindowDestination(this.getEmptyWindowDestination());
            if (this.getTempDirectory() == null) {
                Preconditions.checkArgument(this.getOutputDirectory() != null, "must specify either .withTempDirectory() or .to()");
                ((Builder)resolvedSpec).setTempDirectory(this.getOutputDirectory());
            } else {
                ((Builder)resolvedSpec).setTempDirectory(this.getTempDirectory());
            }
            ((Builder)resolvedSpec).setCompression(this.getCompression());
            ((Builder)resolvedSpec).setNumShards(this.getNumShards());
            ((Builder)resolvedSpec).setSharding(this.getSharding());
            ((Builder)resolvedSpec).setIgnoreWindowing(this.getIgnoreWindowing());
            ((Builder)resolvedSpec).setNoSpilling(this.getNoSpilling());
            Write resolved = ((Builder)resolvedSpec).build();
            WriteFiles writeFiles = WriteFiles.to(new ViaFileBasedSink(resolved)).withSideInputs(Lists.newArrayList(super.getAllSideInputs()));
            writeFiles = this.getNumShards() != null ? writeFiles.withNumShards(this.getNumShards()) : (this.getSharding() != null ? writeFiles.withSharding(this.getSharding()) : writeFiles.withRunnerDeterminedSharding());
            if (!this.getIgnoreWindowing()) {
                writeFiles = writeFiles.withWindowedWrites();
            }
            if (this.getNoSpilling()) {
                writeFiles = writeFiles.withNoSpilling();
            }
            return (WriteFilesResult)input.apply(writeFiles);
        }

        private Coder<DestinationT> resolveDestinationCoder(PCollection<UserT> input) {
            Coder<DestinationT> destinationCoder = this.getDestinationCoder();
            if (destinationCoder == null) {
                TypeDescriptor<DestinationT> destinationT = TypeDescriptors.outputOf(this.getDestinationFn().getClosure());
                try {
                    destinationCoder = input.getPipeline().getCoderRegistry().getCoder(destinationT);
                }
                catch (CannotProvideCoderException e) {
                    throw new IllegalArgumentException("Unable to infer a coder for destination type (inferred from .by() as \"" + destinationT + "\") - specify it explicitly using .withDestinationCoder()");
                }
            }
            return destinationCoder;
        }

        private Collection<PCollectionView<?>> getAllSideInputs() {
            return Requirements.union(this.getDestinationFn(), this.getOutputFn(), this.getSinkFn(), this.getFileNamingFn()).getSideInputs();
        }

        private static class ViaFileBasedSink<UserT, DestinationT, OutputT>
        extends FileBasedSink<UserT, DestinationT, OutputT> {
            private final Write<DestinationT, UserT> spec;

            private ViaFileBasedSink(Write<DestinationT, UserT> spec) {
                super(ValueProvider.NestedValueProvider.of(spec.getTempDirectory(), input -> FileSystems.matchNewResource(input, true)), new DynamicDestinationsAdapter(spec), spec.getCompression());
                this.spec = spec;
            }

            @Override
            public FileBasedSink.WriteOperation<DestinationT, OutputT> createWriteOperation() {
                return new FileBasedSink.WriteOperation<DestinationT, OutputT>(this){

                    @Override
                    public FileBasedSink.Writer<DestinationT, OutputT> createWriter() throws Exception {
                        return new FileBasedSink.Writer<DestinationT, OutputT>(this, ""){
                            @Nullable
                            private Sink<OutputT> sink;

                            @Override
                            protected void prepareWrite(WritableByteChannel channel) throws Exception {
                                Contextful.Fn sinkFn = spec.getSinkFn().getClosure();
                                this.sink = sinkFn.apply(this.getDestination(), new Contextful.Fn.Context(){

                                    @Override
                                    public <T> T sideInput(PCollectionView<T> view) {
                                        return this.getWriteOperation().getSink().getDynamicDestinations().sideInput(view);
                                    }
                                });
                                this.sink.open(channel);
                            }

                            @Override
                            public void write(OutputT value) throws Exception {
                                this.sink.write(value);
                            }

                            @Override
                            protected void finishWrite() throws Exception {
                                this.sink.flush();
                            }
                        };
                    }
                };
            }

            private static class DynamicDestinationsAdapter<UserT, DestinationT, OutputT>
            extends FileBasedSink.DynamicDestinations<UserT, DestinationT, OutputT> {
                private final Write<DestinationT, UserT> spec;
                @Nullable
                private transient Contextful.Fn.Context context;

                private DynamicDestinationsAdapter(Write<DestinationT, UserT> spec) {
                    this.spec = spec;
                }

                private Contextful.Fn.Context getContext() {
                    if (this.context == null) {
                        this.context = new Contextful.Fn.Context(){

                            @Override
                            public <T> T sideInput(PCollectionView<T> view) {
                                return this.sideInput(view);
                            }
                        };
                    }
                    return this.context;
                }

                @Override
                public OutputT formatRecord(UserT record) {
                    try {
                        return (OutputT)this.spec.getOutputFn().getClosure().apply(record, this.getContext());
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }

                @Override
                public DestinationT getDestination(UserT element) {
                    try {
                        return this.spec.getDestinationFn().getClosure().apply(element, this.getContext());
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }

                @Override
                public DestinationT getDefaultDestination() {
                    return this.spec.getEmptyWindowDestination();
                }

                @Override
                public FileBasedSink.FilenamePolicy getFilenamePolicy(DestinationT destination) {
                    FileNaming namingFn;
                    try {
                        namingFn = this.spec.getFileNamingFn().getClosure().apply(destination, this.getContext());
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                    return new FileBasedSink.FilenamePolicy(){

                        @Override
                        public ResourceId windowedFilename(int shardNumber, int numShards, BoundedWindow window, PaneInfo paneInfo, FileBasedSink.OutputFileHints outputFileHints) {
                            return FileSystems.matchNewResource(namingFn.getFilename(window, paneInfo, numShards, shardNumber, spec.getCompression()), false);
                        }

                        @Override
                        @Nullable
                        public ResourceId unwindowedFilename(int shardNumber, int numShards, FileBasedSink.OutputFileHints outputFileHints) {
                            return FileSystems.matchNewResource(namingFn.getFilename(GlobalWindow.INSTANCE, PaneInfo.NO_FIRING, numShards, shardNumber, spec.getCompression()), false);
                        }
                    };
                }

                @Override
                public List<PCollectionView<?>> getSideInputs() {
                    return Lists.newArrayList(((Write)this.spec).getAllSideInputs());
                }

                @Override
                @Nullable
                public Coder<DestinationT> getDestinationCoder() {
                    return this.spec.getDestinationCoder();
                }
            }
        }

        @AutoValue.Builder
        static abstract class Builder<DestinationT, UserT> {
            Builder() {
            }

            abstract Builder<DestinationT, UserT> setDynamic(boolean var1);

            abstract Builder<DestinationT, UserT> setSinkFn(Contextful<Contextful.Fn<DestinationT, Sink<?>>> var1);

            abstract Builder<DestinationT, UserT> setOutputFn(Contextful<Contextful.Fn<UserT, ?>> var1);

            abstract Builder<DestinationT, UserT> setDestinationFn(Contextful<Contextful.Fn<UserT, DestinationT>> var1);

            abstract Builder<DestinationT, UserT> setOutputDirectory(ValueProvider<String> var1);

            abstract Builder<DestinationT, UserT> setFilenamePrefix(ValueProvider<String> var1);

            abstract Builder<DestinationT, UserT> setFilenameSuffix(ValueProvider<String> var1);

            abstract Builder<DestinationT, UserT> setConstantFileNaming(FileNaming var1);

            abstract Builder<DestinationT, UserT> setFileNamingFn(Contextful<Contextful.Fn<DestinationT, FileNaming>> var1);

            abstract Builder<DestinationT, UserT> setEmptyWindowDestination(DestinationT var1);

            abstract Builder<DestinationT, UserT> setDestinationCoder(Coder<DestinationT> var1);

            abstract Builder<DestinationT, UserT> setTempDirectory(ValueProvider<String> var1);

            abstract Builder<DestinationT, UserT> setCompression(Compression var1);

            abstract Builder<DestinationT, UserT> setNumShards(@Nullable ValueProvider<Integer> var1);

            abstract Builder<DestinationT, UserT> setSharding(PTransform<PCollection<UserT>, PCollectionView<Integer>> var1);

            abstract Builder<DestinationT, UserT> setIgnoreWindowing(boolean var1);

            abstract Builder<DestinationT, UserT> setNoSpilling(boolean var1);

            abstract Write<DestinationT, UserT> build();
        }

        public static interface FileNaming
        extends Serializable {
            public String getFilename(BoundedWindow var1, PaneInfo var2, int var3, int var4, Compression var5);
        }
    }

    public static interface Sink<ElementT>
    extends Serializable {
        public void open(WritableByteChannel var1) throws IOException;

        public void write(ElementT var1) throws IOException;

        public void flush() throws IOException;
    }

    @AutoValue
    public static abstract class ReadMatches
    extends PTransform<PCollection<MatchResult.Metadata>, PCollection<ReadableFile>> {
        abstract Compression getCompression();

        abstract DirectoryTreatment getDirectoryTreatment();

        abstract Builder toBuilder();

        public ReadMatches withCompression(Compression compression) {
            Preconditions.checkArgument(compression != null, "compression can not be null");
            return this.toBuilder().setCompression(compression).build();
        }

        public ReadMatches withDirectoryTreatment(DirectoryTreatment directoryTreatment) {
            Preconditions.checkArgument(directoryTreatment != null, "directoryTreatment can not be null");
            return this.toBuilder().setDirectoryTreatment(directoryTreatment).build();
        }

        @Override
        public PCollection<ReadableFile> expand(PCollection<MatchResult.Metadata> input) {
            return (PCollection)((Object)input.apply(ParDo.of(new ToReadableFileFn(this))));
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            builder.add(DisplayData.item("compression", this.getCompression().toString()));
            builder.add(DisplayData.item("directoryTreatment", this.getDirectoryTreatment().toString()));
        }

        private static class ToReadableFileFn
        extends DoFn<MatchResult.Metadata, ReadableFile> {
            private final ReadMatches spec;

            private ToReadableFileFn(ReadMatches spec) {
                this.spec = spec;
            }

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext c) {
                MatchResult.Metadata metadata = (MatchResult.Metadata)c.element();
                if (metadata.resourceId().isDirectory()) {
                    switch (this.spec.getDirectoryTreatment()) {
                        case SKIP: {
                            return;
                        }
                        case PROHIBIT: {
                            throw new IllegalArgumentException("Trying to read " + metadata.resourceId() + " which is a directory");
                        }
                    }
                    throw new UnsupportedOperationException("Unknown DirectoryTreatment: " + (Object)((Object)this.spec.getDirectoryTreatment()));
                }
                Compression compression = this.spec.getCompression() == Compression.AUTO ? Compression.detect(metadata.resourceId().getFilename()) : this.spec.getCompression();
                c.output(new ReadableFile(MatchResult.Metadata.builder().setResourceId(metadata.resourceId()).setSizeBytes(metadata.sizeBytes()).setLastModifiedMillis(metadata.lastModifiedMillis()).setIsReadSeekEfficient(metadata.isReadSeekEfficient() && compression == Compression.UNCOMPRESSED).build(), compression));
            }
        }

        @AutoValue.Builder
        static abstract class Builder {
            Builder() {
            }

            abstract Builder setCompression(Compression var1);

            abstract Builder setDirectoryTreatment(DirectoryTreatment var1);

            abstract ReadMatches build();
        }

        static enum DirectoryTreatment {
            SKIP,
            PROHIBIT;

        }
    }

    @AutoValue
    public static abstract class MatchAll
    extends PTransform<PCollection<String>, PCollection<MatchResult.Metadata>> {
        abstract MatchConfiguration getConfiguration();

        abstract Builder toBuilder();

        public MatchAll withConfiguration(MatchConfiguration configuration) {
            return this.toBuilder().setConfiguration(configuration).build();
        }

        public MatchAll withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
            return this.withConfiguration(this.getConfiguration().withEmptyMatchTreatment(treatment));
        }

        @Experimental(value=Experimental.Kind.SPLITTABLE_DO_FN)
        public MatchAll continuously(Duration pollInterval, Watch.Growth.TerminationCondition<String, ?> terminationCondition) {
            return this.withConfiguration(this.getConfiguration().continuously(pollInterval, terminationCondition));
        }

        @Override
        public PCollection<MatchResult.Metadata> expand(PCollection<String> input) {
            PCollection res = this.getConfiguration().getWatchInterval() == null ? (PCollection)((Object)input.apply("Match filepatterns", ParDo.of(new MatchFn(this.getConfiguration().getEmptyMatchTreatment())))) : (PCollection)((PCollection)input.apply("Continuously match filepatterns", Watch.growthOf(Contextful.of(new MatchPollFn(), Requirements.empty()), new ExtractFilenameFn()).withPollInterval(this.getConfiguration().getWatchInterval()).withTerminationPerInput(this.getConfiguration().getWatchTerminationCondition()))).apply(Values.create());
            return (PCollection)res.apply(Reshuffle.viaRandomKey());
        }

        private static class ExtractFilenameFn
        implements SerializableFunction<MatchResult.Metadata, String> {
            private ExtractFilenameFn() {
            }

            @Override
            public String apply(MatchResult.Metadata input) {
                return input.resourceId().toString();
            }
        }

        private static class MatchPollFn
        extends Watch.Growth.PollFn<String, MatchResult.Metadata> {
            private MatchPollFn() {
            }

            @Override
            public Watch.Growth.PollResult<MatchResult.Metadata> apply(String element, Contextful.Fn.Context c) throws Exception {
                Instant now = Instant.now();
                return Watch.Growth.PollResult.incomplete(now, FileSystems.match(element, EmptyMatchTreatment.ALLOW).metadata()).withWatermark(now);
            }
        }

        private static class MatchFn
        extends DoFn<String, MatchResult.Metadata> {
            private final EmptyMatchTreatment emptyMatchTreatment;

            public MatchFn(EmptyMatchTreatment emptyMatchTreatment) {
                this.emptyMatchTreatment = emptyMatchTreatment;
            }

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext c) throws Exception {
                String filepattern = (String)c.element();
                MatchResult match = FileSystems.match(filepattern, this.emptyMatchTreatment);
                LOG.info("Matched {} files for pattern {}", (Object)match.metadata().size(), (Object)filepattern);
                for (MatchResult.Metadata metadata : match.metadata()) {
                    c.output(metadata);
                }
            }
        }

        @AutoValue.Builder
        static abstract class Builder {
            Builder() {
            }

            abstract Builder setConfiguration(MatchConfiguration var1);

            abstract MatchAll build();
        }
    }

    @AutoValue
    public static abstract class Match
    extends PTransform<PBegin, PCollection<MatchResult.Metadata>> {
        @Nullable
        abstract ValueProvider<String> getFilepattern();

        abstract MatchConfiguration getConfiguration();

        abstract Builder toBuilder();

        public Match filepattern(String filepattern) {
            return this.filepattern(ValueProvider.StaticValueProvider.of(filepattern));
        }

        public Match filepattern(ValueProvider<String> filepattern) {
            return this.toBuilder().setFilepattern(filepattern).build();
        }

        public Match withConfiguration(MatchConfiguration configuration) {
            return this.toBuilder().setConfiguration(configuration).build();
        }

        public Match withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
            return this.withConfiguration(this.getConfiguration().withEmptyMatchTreatment(treatment));
        }

        @Experimental(value=Experimental.Kind.SPLITTABLE_DO_FN)
        public Match continuously(Duration pollInterval, Watch.Growth.TerminationCondition<String, ?> terminationCondition) {
            return this.withConfiguration(this.getConfiguration().continuously(pollInterval, terminationCondition));
        }

        @Override
        public PCollection<MatchResult.Metadata> expand(PBegin input) {
            return ((PCollection)input.apply("Create filepattern", Create.ofProvider(this.getFilepattern(), StringUtf8Coder.of()))).apply("Via MatchAll", FileIO.matchAll().withConfiguration(this.getConfiguration()));
        }

        @AutoValue.Builder
        static abstract class Builder {
            Builder() {
            }

            abstract Builder setFilepattern(ValueProvider<String> var1);

            abstract Builder setConfiguration(MatchConfiguration var1);

            abstract Match build();
        }
    }

    @AutoValue
    public static abstract class MatchConfiguration
    implements HasDisplayData,
    Serializable {
        public static MatchConfiguration create(EmptyMatchTreatment emptyMatchTreatment) {
            return new AutoValue_FileIO_MatchConfiguration.Builder().setEmptyMatchTreatment(emptyMatchTreatment).build();
        }

        abstract EmptyMatchTreatment getEmptyMatchTreatment();

        @Nullable
        abstract Duration getWatchInterval();

        @Nullable
        abstract Watch.Growth.TerminationCondition<String, ?> getWatchTerminationCondition();

        abstract Builder toBuilder();

        public MatchConfiguration withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
            return this.toBuilder().setEmptyMatchTreatment(treatment).build();
        }

        public MatchConfiguration continuously(Duration interval, Watch.Growth.TerminationCondition<String, ?> condition) {
            return this.toBuilder().setWatchInterval(interval).setWatchTerminationCondition(condition).build();
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            builder.add(DisplayData.item("emptyMatchTreatment", this.getEmptyMatchTreatment().toString()).withLabel("Treatment of filepatterns that match no files")).addIfNotNull(DisplayData.item("watchForNewFilesInterval", this.getWatchInterval()).withLabel("Interval to watch for new files"));
        }

        @AutoValue.Builder
        static abstract class Builder {
            Builder() {
            }

            abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment var1);

            abstract Builder setWatchInterval(Duration var1);

            abstract Builder setWatchTerminationCondition(Watch.Growth.TerminationCondition<String, ?> var1);

            abstract MatchConfiguration build();
        }
    }

    public static final class ReadableFile {
        private final MatchResult.Metadata metadata;
        private final Compression compression;

        ReadableFile(MatchResult.Metadata metadata, Compression compression) {
            this.metadata = metadata;
            this.compression = compression;
        }

        public MatchResult.Metadata getMetadata() {
            return this.metadata;
        }

        public Compression getCompression() {
            return this.compression;
        }

        public ReadableByteChannel open() throws IOException {
            return this.compression.readDecompressed(FileSystems.open(this.metadata.resourceId()));
        }

        public SeekableByteChannel openSeekable() throws IOException {
            Preconditions.checkState(this.getMetadata().isReadSeekEfficient(), "The file %s is not seekable", (Object)this.metadata.resourceId());
            return (SeekableByteChannel)this.open();
        }

        public byte[] readFullyAsBytes() throws IOException {
            try (InputStream stream = Channels.newInputStream(this.open());){
                byte[] byArray = StreamUtils.getBytesWithoutClosing(stream);
                return byArray;
            }
        }

        public String readFullyAsUTF8String() throws IOException {
            return new String(this.readFullyAsBytes(), StandardCharsets.UTF_8);
        }

        public String toString() {
            return "ReadableFile{metadata=" + this.metadata + ", compression=" + (Object)((Object)this.compression) + '}';
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ReadableFile that = (ReadableFile)o;
            return Objects.equal(this.metadata, that.metadata) && this.compression == that.compression;
        }

        public int hashCode() {
            return Objects.hashCode(new Object[]{this.metadata, this.compression});
        }
    }
}

