package org.apache.beam.sdk.io.mongodb;

import com.google.common.base.Preconditions;
import com.mongodb.DB;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.Mongo;
import com.mongodb.MongoURI;
import com.mongodb.gridfs.GridFS;
import com.mongodb.gridfs.GridFSDBFile;
import com.mongodb.util.JSON;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.mongodb.AutoValue_MongoDbGridFSIO_Read;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.bson.types.ObjectId;
import org.joda.time.Duration;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.class */
public class MongoDbGridFSIO {
    private static final Parser<String> TEXT_PARSER = new Parser<String>() { // from class: org.apache.beam.sdk.io.mongodb.MongoDbGridFSIO.1
        @Override // org.apache.beam.sdk.io.mongodb.MongoDbGridFSIO.Parser
        public void parse(GridFSDBFile gridFSDBFile, ParserCallback<String> parserCallback) throws IOException {
            Instant instant = new Instant(gridFSDBFile.getUploadDate().getTime());
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(gridFSDBFile.getInputStream()));
            Throwable th = null;
            try {
                try {
                    for (String readLine = bufferedReader.readLine(); readLine != null; readLine = bufferedReader.readLine()) {
                        parserCallback.output(readLine, instant);
                    }
                    if (bufferedReader != null) {
                        if (0 == 0) {
                            bufferedReader.close();
                            return;
                        }
                        try {
                            bufferedReader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (bufferedReader != null) {
                    if (th != null) {
                        try {
                            bufferedReader.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        bufferedReader.close();
                    }
                }
                throw th4;
            }
        }
    };

    /* loaded from: input_file:org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO$Parser.class */
    public interface Parser<T> extends Serializable {
        void parse(GridFSDBFile gridFSDBFile, ParserCallback<T> parserCallback) throws IOException;
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO$ParserCallback.class */
    public interface ParserCallback<T> extends Serializable {
        void output(T t);

        void output(T t, Instant instant);
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO$Read.class */
    public static abstract class Read<T> extends PTransform<PBegin, PCollection<T>> {

        /* JADX INFO: Access modifiers changed from: protected */
        /* loaded from: input_file:org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO$Read$BoundedGridFSSource.class */
        public static class BoundedGridFSSource extends BoundedSource<ObjectId> {
            private Read spec;

            @Nullable
            private List<ObjectId> objectIds;

            /* loaded from: input_file:org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO$Read$BoundedGridFSSource$GridFSReader.class */
            static class GridFSReader extends BoundedSource.BoundedReader<ObjectId> {
                final BoundedGridFSSource source;

                @Nullable
                final List<ObjectId> objects;
                Mongo mongo;
                DBCursor cursor;
                Iterator<ObjectId> iterator;
                ObjectId current;

                GridFSReader(BoundedGridFSSource boundedGridFSSource, List<ObjectId> list) {
                    this.source = boundedGridFSSource;
                    this.objects = list;
                }

                /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] */
                public BoundedSource<ObjectId> m1getCurrentSource() {
                    return this.source;
                }

                public boolean start() throws IOException {
                    if (this.objects == null) {
                        this.mongo = this.source.setupMongo();
                        this.cursor = this.source.createCursor(this.source.setupGridFS(this.mongo));
                    } else {
                        this.iterator = this.objects.iterator();
                    }
                    return advance();
                }

                public boolean advance() throws IOException {
                    if (this.iterator != null && this.iterator.hasNext()) {
                        this.current = this.iterator.next();
                        return true;
                    }
                    if (this.cursor == null || !this.cursor.hasNext()) {
                        this.current = null;
                        return false;
                    }
                    this.current = (ObjectId) this.cursor.next().getId();
                    return true;
                }

                /* renamed from: getCurrent, reason: merged with bridge method [inline-methods] */
                public ObjectId m2getCurrent() throws NoSuchElementException {
                    if (this.current == null) {
                        throw new NoSuchElementException();
                    }
                    return this.current;
                }

                public Instant getCurrentTimestamp() throws NoSuchElementException {
                    if (this.current == null) {
                        throw new NoSuchElementException();
                    }
                    return new Instant(this.current.getTimestamp() * 1000);
                }

                public void close() throws IOException {
                    if (this.mongo != null) {
                        this.mongo.close();
                    }
                }
            }

            BoundedGridFSSource(Read read, List<ObjectId> list) {
                this.spec = read;
                this.objectIds = list;
            }

            /* JADX INFO: Access modifiers changed from: private */
            public Mongo setupMongo() {
                return this.spec.uri() == null ? new Mongo() : new Mongo(new MongoURI(this.spec.uri()));
            }

            /* JADX INFO: Access modifiers changed from: private */
            public GridFS setupGridFS(Mongo mongo) {
                DB db = this.spec.database() == null ? mongo.getDB("gridfs") : mongo.getDB(this.spec.database());
                return this.spec.bucket() == null ? new GridFS(db) : new GridFS(db, this.spec.bucket());
            }

            /* JADX INFO: Access modifiers changed from: private */
            public DBCursor createCursor(GridFS gridFS) {
                return this.spec.filter() != null ? gridFS.getFileList((DBObject) JSON.parse(this.spec.filter())).sort((DBObject) null) : gridFS.getFileList().sort((DBObject) null);
            }

            public List<? extends BoundedSource<ObjectId>> splitIntoBundles(long j, PipelineOptions pipelineOptions) throws Exception {
                Mongo mongo = setupMongo();
                try {
                    DBCursor createCursor = createCursor(setupGridFS(mongo));
                    long j2 = 0;
                    ArrayList arrayList = new ArrayList();
                    ArrayList arrayList2 = new ArrayList();
                    while (createCursor.hasNext()) {
                        GridFSDBFile next = createCursor.next();
                        long length = next.getLength();
                        if (j2 + length > j && !arrayList2.isEmpty()) {
                            arrayList.add(new BoundedGridFSSource(this.spec, arrayList2));
                            j2 = 0;
                            arrayList2 = new ArrayList();
                        }
                        arrayList2.add((ObjectId) next.getId());
                        j2 += length;
                    }
                    if (!arrayList2.isEmpty() || arrayList.isEmpty()) {
                        arrayList.add(new BoundedGridFSSource(this.spec, arrayList2));
                    }
                    return arrayList;
                } finally {
                    mongo.close();
                }
            }

            public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception {
                Mongo mongo = setupMongo();
                try {
                    DBCursor createCursor = createCursor(setupGridFS(mongo));
                    long j = 0;
                    while (createCursor.hasNext()) {
                        j += createCursor.next().getLength();
                    }
                    return j;
                } finally {
                    mongo.close();
                }
            }

            public boolean producesSortedKeys(PipelineOptions pipelineOptions) throws Exception {
                return false;
            }

            public BoundedSource.BoundedReader<ObjectId> createReader(PipelineOptions pipelineOptions) throws IOException {
                return new GridFSReader(this, this.objectIds);
            }

            public void validate() {
            }

            public void populateDisplayData(DisplayData.Builder builder) {
                this.spec.populateDisplayData(builder);
            }

            public Coder<ObjectId> getDefaultOutputCoder() {
                return SerializableCoder.of(ObjectId.class);
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO$Read$Builder.class */
        public static abstract class Builder<T> {
            abstract Builder<T> setUri(String str);

            abstract Builder<T> setDatabase(String str);

            abstract Builder<T> setBucket(String str);

            abstract Builder<T> setParser(Parser<T> parser);

            abstract Builder<T> setCoder(Coder<T> coder);

            abstract Builder<T> setSkew(Duration duration);

            abstract Builder<T> setFilter(String str);

            abstract Read<T> build();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract String uri();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract String database();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract String bucket();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract Parser<T> parser();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract Coder<T> coder();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract Duration skew();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract String filter();

        abstract Builder<T> toBuilder();

        public Read<T> withUri(String str) {
            Preconditions.checkNotNull(str);
            return toBuilder().setUri(str).build();
        }

        public Read<T> withDatabase(String str) {
            Preconditions.checkNotNull(str);
            return toBuilder().setDatabase(str).build();
        }

        public Read<T> withBucket(String str) {
            Preconditions.checkNotNull(str);
            return toBuilder().setBucket(str).build();
        }

        /* JADX WARN: Multi-variable type inference failed */
        public <X> Read<X> withParser(Parser<X> parser) {
            Preconditions.checkNotNull(parser);
            return toBuilder().setParser(parser).setCoder(null).build();
        }

        public Read<T> withCoder(Coder<T> coder) {
            Preconditions.checkNotNull(coder);
            return toBuilder().setCoder(coder).build();
        }

        public Read<T> withSkew(Duration duration) {
            return toBuilder().setSkew(duration == null ? Duration.ZERO : duration).build();
        }

        public Read<T> withFilter(String str) {
            return toBuilder().setFilter(str).build();
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.addIfNotNull(DisplayData.item("uri", uri()));
            builder.addIfNotNull(DisplayData.item("database", database()));
            builder.addIfNotNull(DisplayData.item("bucket", bucket()));
            builder.addIfNotNull(DisplayData.item("parser", parser().getClass().getName()));
            builder.addIfNotNull(DisplayData.item("coder", coder().getClass().getName()));
            builder.addIfNotNull(DisplayData.item("skew", skew()));
            builder.addIfNotNull(DisplayData.item("filter", filter()));
        }

        public PCollection<T> apply(PBegin pBegin) {
            final BoundedGridFSSource boundedGridFSSource = new BoundedGridFSSource(this, null);
            PCollection<T> apply = pBegin.getPipeline().apply(org.apache.beam.sdk.io.Read.from(boundedGridFSSource)).apply(ParDo.of(new DoFn<ObjectId, T>() { // from class: org.apache.beam.sdk.io.mongodb.MongoDbGridFSIO.Read.1
                Mongo mongo;
                GridFS gridfs;

                @DoFn.Setup
                public void setup() {
                    this.mongo = boundedGridFSSource.setupMongo();
                    this.gridfs = boundedGridFSSource.setupGridFS(this.mongo);
                }

                @DoFn.Teardown
                public void teardown() {
                    this.mongo.close();
                }

                @DoFn.ProcessElement
                public void processElement(final DoFn<ObjectId, T>.ProcessContext processContext) throws IOException {
                    Read.this.parser().parse(this.gridfs.find((ObjectId) processContext.element()), new ParserCallback<T>() { // from class: org.apache.beam.sdk.io.mongodb.MongoDbGridFSIO.Read.1.1
                        @Override // org.apache.beam.sdk.io.mongodb.MongoDbGridFSIO.ParserCallback
                        public void output(T t, Instant instant) {
                            Preconditions.checkNotNull(instant);
                            processContext.outputWithTimestamp(t, instant);
                        }

                        @Override // org.apache.beam.sdk.io.mongodb.MongoDbGridFSIO.ParserCallback
                        public void output(T t) {
                            processContext.output(t);
                        }
                    });
                }

                public Duration getAllowedTimestampSkew() {
                    return Read.this.skew();
                }
            }));
            if (coder() != null) {
                apply.setCoder(coder());
            }
            return apply;
        }
    }

    public static Read<String> read() {
        return new AutoValue_MongoDbGridFSIO_Read.Builder().build().withParser(TEXT_PARSER).withCoder(StringUtf8Coder.of());
    }
}
