package org.apache.beam.repackaged.direct_java.sdk.fn.stream;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Phaser;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.repackaged.direct_java.sdk.fn.CancellableQueue;
import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.CallStreamObserver;
import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;

@ThreadSafe
/* loaded from: input_file:org/apache/beam/repackaged/direct_java/sdk/fn/stream/BufferingStreamObserver.class */
public final class BufferingStreamObserver<T> implements StreamObserver<T> {
    private static final Object POISON_PILL = new Object();
    private final CancellableQueue<T> queue;
    private final Phaser phaser;
    private final CallStreamObserver<T> outboundObserver;
    private final Future<?> queueDrainer;
    private final int bufferSize;

    /* loaded from: input_file:org/apache/beam/repackaged/direct_java/sdk/fn/stream/BufferingStreamObserver$OnErrorException.class */
    private static class OnErrorException extends Exception {
        public OnErrorException(Throwable th) {
            super(th);
        }

        @Override // java.lang.Throwable
        public synchronized Throwable getCause() {
            return super.getCause();
        }
    }

    public BufferingStreamObserver(Phaser phaser, CallStreamObserver<T> callStreamObserver, ExecutorService executorService, int i) {
        this.phaser = phaser;
        this.bufferSize = i;
        this.queue = new CancellableQueue<>(i);
        this.outboundObserver = callStreamObserver;
        this.queueDrainer = executorService.submit(this::drainQueue);
    }

    private void drainQueue() {
        while (true) {
            try {
                int phase = this.phaser.getPhase();
                while (this.outboundObserver.isReady()) {
                    T take = this.queue.take();
                    if (take == POISON_PILL) {
                        this.outboundObserver.onCompleted();
                        return;
                    }
                    this.outboundObserver.onNext(take);
                }
                this.phaser.awaitAdvance(phase);
            } catch (OnErrorException e) {
                this.outboundObserver.onError(e.getCause());
                return;
            } catch (Exception e2) {
                this.queue.cancel(e2);
                this.outboundObserver.onError(e2);
                return;
            }
        }
    }

    public void onNext(T t) {
        try {
            this.queue.put(t);
        } catch (InterruptedException e) {
            this.queue.cancel(e);
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        } catch (Exception e2) {
            throw new RuntimeException(e2);
        }
    }

    public void onError(Throwable th) {
        this.queue.cancel(new OnErrorException(th));
        try {
            this.queueDrainer.get();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void onCompleted() {
        try {
            this.queue.put(POISON_PILL);
            this.queueDrainer.get();
        } catch (Exception e) {
            this.queue.cancel(e);
            throw new RuntimeException(e);
        }
    }

    @VisibleForTesting
    public int getBufferSize() {
        return this.bufferSize;
    }
}
