package io.vertx.ext.mongo.impl;

import com.mongodb.async.AsyncBatchCursor;
import com.mongodb.async.client.MongoIterable;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.StreamBase;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.concurrent.atomic.AtomicReference;

/* loaded from: input_file:io/vertx/ext/mongo/impl/MongoIterableStream.class */
class MongoIterableStream implements ReadStream<JsonObject> {
    private final Context context;
    private final MongoIterable<JsonObject> mongoIterable;
    private final int batchSize;
    private AsyncBatchCursor<JsonObject> batchCursor;
    private Deque<JsonObject> queue;
    private Handler<JsonObject> dataHandler;
    private Handler<Throwable> exceptionHandler;
    private Handler<Void> endHandler;
    private boolean paused;
    private boolean readInProgress;
    private boolean closed;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MongoIterableStream(Context context, MongoIterable<JsonObject> mongoIterable, int i) {
        this.context = context;
        this.mongoIterable = mongoIterable;
        this.batchSize = i;
    }

    public synchronized MongoIterableStream exceptionHandler(Handler<Throwable> handler) {
        checkClosed();
        this.exceptionHandler = handler;
        return this;
    }

    private void checkClosed() {
        if (this.closed) {
            throw new IllegalArgumentException("Stream is closed");
        }
    }

    public synchronized MongoIterableStream handler(Handler<JsonObject> handler) {
        checkClosed();
        if (handler == null) {
            close();
        } else {
            this.dataHandler = handler;
            try {
                this.mongoIterable.batchCursor((asyncBatchCursor, th) -> {
                    this.context.runOnContext(r6 -> {
                        synchronized (this) {
                            if (th != null) {
                                close();
                                handleException(th);
                            } else {
                                this.batchCursor = asyncBatchCursor;
                                this.batchCursor.setBatchSize(this.batchSize);
                                if (canRead()) {
                                    doRead();
                                }
                            }
                        }
                    });
                });
            } catch (Exception e) {
                close();
                handleException(e);
            }
        }
        return this;
    }

    private boolean canRead() {
        return (this.paused || this.closed) ? false : true;
    }

    /* renamed from: pause, reason: merged with bridge method [inline-methods] */
    public synchronized MongoIterableStream m7pause() {
        checkClosed();
        this.paused = true;
        return this;
    }

    /* renamed from: resume, reason: merged with bridge method [inline-methods] */
    public synchronized MongoIterableStream m6resume() {
        checkClosed();
        if (this.paused) {
            this.paused = false;
            if (this.dataHandler != null) {
                doRead();
            }
        }
        return this;
    }

    private synchronized void doRead() {
        if (this.readInProgress) {
            return;
        }
        this.readInProgress = true;
        if (this.queue == null) {
            this.queue = new ArrayDeque(this.batchSize);
        }
        if (this.queue.isEmpty()) {
            this.context.executeBlocking(future -> {
                this.batchCursor.next((list, th) -> {
                    if (th != null) {
                        future.fail(th);
                    } else {
                        future.complete(list == null ? Collections.emptyList() : list);
                    }
                });
            }, false, asyncResult -> {
                synchronized (this) {
                    if (asyncResult.succeeded()) {
                        this.queue.addAll((Collection) asyncResult.result());
                        if (this.queue.isEmpty()) {
                            close();
                            if (this.endHandler != null) {
                                this.endHandler.handle((Object) null);
                            }
                        } else {
                            emitQueued();
                        }
                    } else {
                        close();
                        handleException(asyncResult.cause());
                    }
                }
            });
        } else {
            this.context.runOnContext(r3 -> {
                emitQueued();
            });
        }
    }

    private void handleException(Throwable th) {
        if (this.exceptionHandler != null) {
            this.exceptionHandler.handle(th);
        }
    }

    private synchronized void emitQueued() {
        while (!this.queue.isEmpty() && canRead()) {
            this.dataHandler.handle(this.queue.remove());
        }
        this.readInProgress = false;
        if (canRead()) {
            doRead();
        }
    }

    public synchronized MongoIterableStream endHandler(Handler<Void> handler) {
        this.endHandler = handler;
        return this;
    }

    private void close() {
        this.closed = true;
        AtomicReference atomicReference = new AtomicReference();
        this.context.executeBlocking(future -> {
            synchronized (this) {
                atomicReference.set(this.batchCursor);
            }
            AsyncBatchCursor asyncBatchCursor = (AsyncBatchCursor) atomicReference.get();
            if (asyncBatchCursor != null) {
                asyncBatchCursor.close();
            }
            future.complete();
        }, false, (Handler) null);
    }

    /* renamed from: endHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ ReadStream m5endHandler(Handler handler) {
        return endHandler((Handler<Void>) handler);
    }

    /* renamed from: handler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ ReadStream m8handler(Handler handler) {
        return handler((Handler<JsonObject>) handler);
    }

    /* renamed from: exceptionHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ ReadStream m9exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }

    /* renamed from: exceptionHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ StreamBase m10exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
