package org.apache.beam.runners.core.fn;

import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.io.Closeable;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.fn.v1.BeamFnApi;
import org.apache.beam.runners.core.java.repackaged.com.google.common.util.concurrent.ListenableFuture;
import org.apache.beam.runners.core.java.repackaged.com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/core/fn/FnApiControlClient.class */
public class FnApiControlClient implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(FnApiControlClient.class);
    private final StreamObserver<BeamFnApi.InstructionRequest> requestReceiver;
    private final ResponseStreamObserver responseObserver = new ResponseStreamObserver();
    private final Map<String, SettableFuture<BeamFnApi.InstructionResponse>> outstandingRequests = new ConcurrentHashMap();
    private volatile boolean isClosed;

    /* loaded from: input_file:org/apache/beam/runners/core/fn/FnApiControlClient$ResponseStreamObserver.class */
    private class ResponseStreamObserver implements StreamObserver<BeamFnApi.InstructionResponse> {
        private ResponseStreamObserver() {
        }

        public void onNext(BeamFnApi.InstructionResponse instructionResponse) {
            FnApiControlClient.LOG.debug("Received InstructionResponse {}", instructionResponse);
            SettableFuture settableFuture = (SettableFuture) FnApiControlClient.this.outstandingRequests.remove(instructionResponse.getInstructionId());
            if (settableFuture != null) {
                settableFuture.set(instructionResponse);
            }
        }

        public void onCompleted() {
            FnApiControlClient.this.closeAndTerminateOutstandingRequests(new IllegalStateException("SDK harness closed connection"));
        }

        public void onError(Throwable th) {
            FnApiControlClient.LOG.error("{} received error {}", FnApiControlClient.class.getSimpleName(), th);
            FnApiControlClient.this.closeAndTerminateOutstandingRequests(th);
        }
    }

    private FnApiControlClient(StreamObserver<BeamFnApi.InstructionRequest> streamObserver) {
        this.requestReceiver = streamObserver;
    }

    public static FnApiControlClient forRequestObserver(StreamObserver<BeamFnApi.InstructionRequest> streamObserver) {
        return new FnApiControlClient(streamObserver);
    }

    public synchronized ListenableFuture<BeamFnApi.InstructionResponse> handle(BeamFnApi.InstructionRequest instructionRequest) {
        LOG.debug("Sending InstructionRequest {}", instructionRequest);
        SettableFuture<BeamFnApi.InstructionResponse> create = SettableFuture.create();
        this.outstandingRequests.put(instructionRequest.getInstructionId(), create);
        this.requestReceiver.onNext(instructionRequest);
        return create;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamObserver<BeamFnApi.InstructionResponse> asResponseObserver() {
        return this.responseObserver;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        closeAndTerminateOutstandingRequests(new IllegalStateException("Runner closed connection"));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void closeAndTerminateOutstandingRequests(Throwable th) {
        if (this.isClosed) {
            return;
        }
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(this.outstandingRequests);
        this.outstandingRequests.clear();
        this.isClosed = true;
        if (concurrentHashMap.isEmpty()) {
            this.requestReceiver.onCompleted();
            return;
        }
        this.requestReceiver.onError(new StatusRuntimeException(Status.CANCELLED.withDescription(th.getMessage())));
        LOG.error("{} closed, clearing outstanding requests {}", FnApiControlClient.class.getSimpleName(), concurrentHashMap);
        Iterator it = concurrentHashMap.values().iterator();
        while (it.hasNext()) {
            ((SettableFuture) it.next()).setException(th);
        }
    }
}
