/*
 * Decompiled with CFR 0.152.
 */
package org.ballerinalang.net.grpc.listener;

import com.google.protobuf.Descriptors;
import java.util.HashMap;
import java.util.Map;
import org.ballerinalang.jvm.observability.ObserveUtils;
import org.ballerinalang.jvm.observability.ObserverContext;
import org.ballerinalang.jvm.values.connector.CallableUnitCallback;
import org.ballerinalang.net.grpc.Message;
import org.ballerinalang.net.grpc.ServerCall;
import org.ballerinalang.net.grpc.ServiceResource;
import org.ballerinalang.net.grpc.Status;
import org.ballerinalang.net.grpc.StreamObserver;
import org.ballerinalang.net.grpc.callback.StreamingCallableUnitCallBack;
import org.ballerinalang.net.grpc.callback.UnaryCallableUnitCallBack;
import org.ballerinalang.net.grpc.exception.GrpcServerException;
import org.ballerinalang.net.grpc.listener.ServerCallHandler;

public class StreamingServerCallHandler
extends ServerCallHandler {
    private final Map<String, ServiceResource> resourceMap;

    public StreamingServerCallHandler(Descriptors.MethodDescriptor methodDescriptor, Map<String, ServiceResource> resourceMap) throws GrpcServerException {
        super(methodDescriptor);
        this.validateStreamingResources(resourceMap);
        this.resourceMap = resourceMap;
    }

    @Override
    public ServerCallHandler.Listener startCall(ServerCall call) {
        ServerCallHandler.ServerCallStreamObserver responseObserver = new ServerCallHandler.ServerCallStreamObserver(call);
        StreamObserver requestObserver = this.invoke(responseObserver, call.getObserverContext());
        return new StreamingServerCallListener(requestObserver, responseObserver);
    }

    private StreamObserver invoke(final StreamObserver responseObserver, final ObserverContext context) {
        ServiceResource onOpen = this.resourceMap.get("onOpen");
        StreamingCallableUnitCallBack callback = new StreamingCallableUnitCallBack(responseObserver, context);
        final HashMap<String, ObserverContext> properties = new HashMap<String, ObserverContext>();
        if (ObserveUtils.isObservabilityEnabled()) {
            properties.put("__observer_context__", context);
        }
        onOpen.getRuntime().invokeMethodAsync(onOpen.getService(), onOpen.getFunctionName(), (CallableUnitCallback)callback, properties, this.computeMessageParams(onOpen, null, responseObserver));
        callback.available.acquireUninterruptibly();
        return new StreamObserver(){

            @Override
            public void onNext(Message value) {
                ServiceResource onMessage = (ServiceResource)StreamingServerCallHandler.this.resourceMap.get("onMessage");
                StreamingCallableUnitCallBack callback = new StreamingCallableUnitCallBack(responseObserver, context);
                onMessage.getRuntime().invokeMethodAsync(onMessage.getService(), onMessage.getFunctionName(), (CallableUnitCallback)callback, properties, StreamingServerCallHandler.this.computeMessageParams(onMessage, value, responseObserver));
            }

            @Override
            public void onError(Message error) {
                ServiceResource onError = (ServiceResource)StreamingServerCallHandler.this.resourceMap.get("onError");
                StreamingServerCallHandler.this.onErrorInvoke(onError, responseObserver, error, context);
            }

            @Override
            public void onCompleted() {
                ServiceResource onCompleted = (ServiceResource)StreamingServerCallHandler.this.resourceMap.get("onComplete");
                UnaryCallableUnitCallBack callback = new UnaryCallableUnitCallBack(responseObserver, Boolean.FALSE, context);
                onCompleted.getRuntime().invokeMethodAsync(onCompleted.getService(), onCompleted.getFunctionName(), (CallableUnitCallback)callback, properties, StreamingServerCallHandler.this.computeMessageParams(onCompleted, null, responseObserver));
            }
        };
    }

    private void validateStreamingResources(Map<String, ServiceResource> resourceMap) throws GrpcServerException {
        if (resourceMap.isEmpty()) {
            throw new GrpcServerException("Streaming service resources don't exist.");
        }
        if (!resourceMap.containsKey("onOpen")) {
            throw new GrpcServerException("Streaming service onOpen resource doesn't exist.");
        }
        if (!resourceMap.containsKey("onMessage")) {
            throw new GrpcServerException("Streaming service onMessage resource doesn't exist.");
        }
        if (!resourceMap.containsKey("onError")) {
            throw new GrpcServerException("Streaming service onError resource doesn't exist.");
        }
        if (!resourceMap.containsKey("onComplete")) {
            throw new GrpcServerException("Streaming service onComplete resource doesn't exist.");
        }
    }

    private static final class StreamingServerCallListener
    implements ServerCallHandler.Listener {
        private final StreamObserver requestObserver;
        private final ServerCallHandler.ServerCallStreamObserver responseObserver;
        private boolean halfClosed = false;

        StreamingServerCallListener(StreamObserver requestObserver, ServerCallHandler.ServerCallStreamObserver responseObserver) {
            this.requestObserver = requestObserver;
            this.responseObserver = responseObserver;
        }

        @Override
        public void onMessage(Message request) {
            this.requestObserver.onNext(request);
        }

        @Override
        public void onHalfClose() {
            this.halfClosed = true;
            this.requestObserver.onCompleted();
        }

        @Override
        public void onCancel() {
            this.responseObserver.cancelled = true;
            if (!this.halfClosed) {
                Message message = new Message(Status.Code.CANCELLED.toStatus().withDescription("cancelled before receiving half close").asRuntimeException());
                this.requestObserver.onError(message);
            }
        }

        @Override
        public void onComplete() {
        }
    }
}

