/*
 * Decompiled with CFR 0.152.
 */
package org.ballerinalang.net.grpc.listener;

import com.google.protobuf.Descriptors;
import java.util.Map;
import org.ballerinalang.jvm.values.ObjectValue;
import org.ballerinalang.jvm.values.connector.CallableUnitCallback;
import org.ballerinalang.jvm.values.connector.Executor;
import org.ballerinalang.net.grpc.Message;
import org.ballerinalang.net.grpc.ServerCall;
import org.ballerinalang.net.grpc.ServiceResource;
import org.ballerinalang.net.grpc.Status;
import org.ballerinalang.net.grpc.StreamObserver;
import org.ballerinalang.net.grpc.callback.StreamingCallableUnitCallBack;
import org.ballerinalang.net.grpc.callback.UnaryCallableUnitCallBack;
import org.ballerinalang.net.grpc.exception.ServerRuntimeException;
import org.ballerinalang.net.grpc.listener.ServerCallHandler;

public class StreamingServerCallHandler
extends ServerCallHandler {
    private final Map<String, ServiceResource> resourceMap;

    public StreamingServerCallHandler(Descriptors.MethodDescriptor methodDescriptor, Map<String, ServiceResource> resourceMap) {
        super(methodDescriptor);
        this.resourceMap = resourceMap;
    }

    @Override
    public ServerCallHandler.Listener startCall(ServerCall call) {
        ServerCallHandler.ServerCallStreamObserver responseObserver = new ServerCallHandler.ServerCallStreamObserver(call);
        StreamObserver requestObserver = this.invoke(responseObserver);
        return new StreamingServerCallListener(requestObserver, responseObserver);
    }

    private StreamObserver invoke(final StreamObserver responseObserver) {
        ServiceResource onOpen = this.resourceMap.get("onOpen");
        StreamingCallableUnitCallBack callback = new StreamingCallableUnitCallBack(responseObserver);
        Executor.submit((ObjectValue)onOpen.getService(), (String)onOpen.getFunctionName(), (CallableUnitCallback)callback, null, (Object[])new Object[]{null, this.computeMessageParams(onOpen, null, responseObserver)});
        callback.available.acquireUninterruptibly();
        return new StreamObserver(){

            @Override
            public void onNext(Message value) {
                ServiceResource onMessage = (ServiceResource)StreamingServerCallHandler.this.resourceMap.get("onMessage");
                StreamingCallableUnitCallBack callback = new StreamingCallableUnitCallBack(responseObserver);
                Executor.submit((ObjectValue)onMessage.getService(), (String)onMessage.getFunctionName(), (CallableUnitCallback)callback, null, (Object[])new Object[]{null, StreamingServerCallHandler.this.computeMessageParams(onMessage, value, responseObserver)});
            }

            @Override
            public void onError(Message error) {
                ServiceResource onError = (ServiceResource)StreamingServerCallHandler.this.resourceMap.get("onError");
                StreamingServerCallHandler.this.onErrorInvoke(onError, responseObserver, error);
            }

            @Override
            public void onCompleted() {
                ServiceResource onCompleted = (ServiceResource)StreamingServerCallHandler.this.resourceMap.get("onComplete");
                if (onCompleted == null) {
                    String message = "Error in listener service definition. onError resource does not exists";
                    throw new ServerRuntimeException(message);
                }
                UnaryCallableUnitCallBack callback = new UnaryCallableUnitCallBack(responseObserver, Boolean.FALSE);
                Executor.submit((ObjectValue)onCompleted.getService(), (String)onCompleted.getFunctionName(), (CallableUnitCallback)callback, null, (Object[])new Object[]{null, StreamingServerCallHandler.this.computeMessageParams(onCompleted, null, responseObserver)});
            }
        };
    }

    private static final class StreamingServerCallListener
    implements ServerCallHandler.Listener {
        private final StreamObserver requestObserver;
        private final ServerCallHandler.ServerCallStreamObserver responseObserver;
        private boolean halfClosed = false;

        StreamingServerCallListener(StreamObserver requestObserver, ServerCallHandler.ServerCallStreamObserver responseObserver) {
            this.requestObserver = requestObserver;
            this.responseObserver = responseObserver;
        }

        @Override
        public void onMessage(Message request) {
            this.requestObserver.onNext(request);
        }

        @Override
        public void onHalfClose() {
            this.halfClosed = true;
            this.requestObserver.onCompleted();
        }

        @Override
        public void onCancel() {
            this.responseObserver.cancelled = true;
            if (!this.halfClosed) {
                Message message = new Message(Status.Code.CANCELLED.toStatus().withDescription("cancelled before receiving half close").asRuntimeException());
                this.requestObserver.onError(message);
            }
        }

        @Override
        public void onComplete() {
        }
    }
}

