/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.control;

import java.util.EnumMap;
import java.util.Objects;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BeamFnControlClient {
    private static final String FAKE_INSTRUCTION_ID = "FAKE_INSTRUCTION_ID";
    private static final Logger LOG = LoggerFactory.getLogger(BeamFnControlClient.class);
    private static final BeamFnApi.InstructionRequest POISON_PILL = BeamFnApi.InstructionRequest.newBuilder().setInstructionId("FAKE_INSTRUCTION_ID").build();
    private final StreamObserver<BeamFnApi.InstructionResponse> outboundObserver;
    private final BlockingDeque<BeamFnApi.InstructionRequest> bufferedInstructions = new LinkedBlockingDeque<BeamFnApi.InstructionRequest>();
    private final EnumMap<BeamFnApi.InstructionRequest.RequestCase, ThrowingFunction<BeamFnApi.InstructionRequest, BeamFnApi.InstructionResponse.Builder>> handlers;
    private final CompletableFuture<Object> onFinish;
    private static final Object COMPLETED = new Object();

    public BeamFnControlClient(String id, Endpoints.ApiServiceDescriptor apiServiceDescriptor, ManagedChannelFactory channelFactory, OutboundObserverFactory outboundObserverFactory, EnumMap<BeamFnApi.InstructionRequest.RequestCase, ThrowingFunction<BeamFnApi.InstructionRequest, BeamFnApi.InstructionResponse.Builder>> handlers) {
        this.outboundObserver = outboundObserverFactory.outboundObserverFor(BeamFnControlGrpc.newStub(channelFactory.forDescriptor(apiServiceDescriptor))::control, new InboundObserver());
        this.handlers = handlers;
        this.onFinish = new CompletableFuture();
    }

    public void processInstructionRequests(Executor executor) throws InterruptedException, ExecutionException {
        BeamFnApi.InstructionRequest request;
        while (!Objects.equals(request = this.bufferedInstructions.take(), POISON_PILL)) {
            BeamFnApi.InstructionRequest currentRequest = request;
            executor.execute(() -> {
                try {
                    BeamFnApi.InstructionResponse response = this.delegateOnInstructionRequestType(currentRequest);
                    this.sendInstructionResponse(response);
                }
                catch (Error e) {
                    this.sendErrorResponse(e);
                    throw e;
                }
            });
        }
        this.onFinish.get();
    }

    public BeamFnApi.InstructionResponse delegateOnInstructionRequestType(BeamFnApi.InstructionRequest value) {
        try {
            return this.handlers.getOrDefault(value.getRequestCase(), this::missingHandler).apply(value).setInstructionId(value.getInstructionId()).build();
        }
        catch (Exception e) {
            LOG.error("Exception while trying to handle {} {}", BeamFnApi.InstructionRequest.class.getSimpleName(), value.getInstructionId(), e);
            return BeamFnApi.InstructionResponse.newBuilder().setInstructionId(value.getInstructionId()).setError(Throwables.getStackTraceAsString(e)).build();
        }
        catch (Error e) {
            LOG.error("Error thrown when handling {} {}", BeamFnApi.InstructionRequest.class.getSimpleName(), value.getInstructionId(), e);
            throw e;
        }
    }

    public void sendInstructionResponse(BeamFnApi.InstructionResponse value) {
        LOG.debug("Sending InstructionResponse {}", (Object)value);
        this.outboundObserver.onNext(value);
    }

    private void sendErrorResponse(Error e) {
        this.onFinish.completeExceptionally(e);
        this.outboundObserver.onError(Status.INTERNAL.withDescription(String.format("%s: %s", e.getClass().getName(), e.getMessage())).asException());
    }

    private BeamFnApi.InstructionResponse.Builder missingHandler(BeamFnApi.InstructionRequest request) {
        return BeamFnApi.InstructionResponse.newBuilder().setError(String.format("Unknown InstructionRequest type %s", request.getRequestCase()));
    }

    private class InboundObserver
    implements StreamObserver<BeamFnApi.InstructionRequest> {
        private InboundObserver() {
        }

        @Override
        public void onNext(BeamFnApi.InstructionRequest value) {
            LOG.debug("Received InstructionRequest {}", (Object)value);
            Uninterruptibles.putUninterruptibly(BeamFnControlClient.this.bufferedInstructions, value);
        }

        @Override
        public void onError(Throwable t) {
            this.placePoisonPillIntoQueue();
            BeamFnControlClient.this.onFinish.completeExceptionally(t);
        }

        @Override
        public void onCompleted() {
            this.placePoisonPillIntoQueue();
            BeamFnControlClient.this.onFinish.complete(COMPLETED);
        }

        private void placePoisonPillIntoQueue() {
            while (true) {
                try {
                    BeamFnControlClient.this.bufferedInstructions.putFirst(POISON_PILL);
                    return;
                }
                catch (InterruptedException interruptedException) {
                    continue;
                }
                break;
            }
        }
    }
}

