/*
 * Decompiled with CFR 0.152.
 */
package org.ballerinalang.net.grpc.listener;

import com.google.protobuf.Descriptors;
import io.netty.handler.codec.http.HttpHeaders;
import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.ballerinalang.jvm.BallerinaValues;
import org.ballerinalang.jvm.observability.ObserveUtils;
import org.ballerinalang.jvm.observability.ObserverContext;
import org.ballerinalang.jvm.types.BPackage;
import org.ballerinalang.jvm.types.BStreamType;
import org.ballerinalang.jvm.types.BType;
import org.ballerinalang.jvm.values.ObjectValue;
import org.ballerinalang.jvm.values.StreamValue;
import org.ballerinalang.jvm.values.connector.CallableUnitCallback;
import org.ballerinalang.net.grpc.GrpcConstants;
import org.ballerinalang.net.grpc.Message;
import org.ballerinalang.net.grpc.ServerCall;
import org.ballerinalang.net.grpc.ServiceResource;
import org.ballerinalang.net.grpc.StreamObserver;
import org.ballerinalang.net.grpc.callback.StreamingCallableUnitCallBack;
import org.ballerinalang.net.grpc.exception.GrpcServerException;
import org.ballerinalang.net.grpc.listener.ServerCallHandler;

public class StreamingServerCallHandler
extends ServerCallHandler {
    private final ServiceResource resource;
    private final BType inputType;

    public StreamingServerCallHandler(Descriptors.MethodDescriptor methodDescriptor, ServiceResource resource, BType inputType) throws GrpcServerException {
        super(methodDescriptor);
        if (resource == null) {
            throw new GrpcServerException("Streaming service resource doesn't exist.");
        }
        this.resource = resource;
        this.inputType = inputType;
    }

    @Override
    public ServerCallHandler.Listener startCall(ServerCall call) {
        ServerCallHandler.ServerCallStreamObserver responseObserver = new ServerCallHandler.ServerCallStreamObserver(call);
        StreamObserver requestObserver = this.invoke(responseObserver, call);
        return new StreamingServerCallListener(requestObserver, responseObserver);
    }

    private StreamObserver invoke(StreamObserver responseObserver, ServerCall call) {
        ObserverContext context = call.getObserverContext();
        ObjectValue streamIterator = BallerinaValues.createObjectValue((BPackage)GrpcConstants.PROTOCOL_GRPC_PKG_ID, (String)"StreamIterator", (Object[])new Object[1]);
        LinkedBlockingQueue<Message> messageQueue = new LinkedBlockingQueue<Message>();
        streamIterator.addNativeData("messageQueue", messageQueue);
        streamIterator.addNativeData("Client", (Object)this.getConnectionParameter(responseObserver));
        StreamValue requestStream = new StreamValue((BType)new BStreamType(this.inputType), streamIterator);
        this.onStreamInvoke(this.resource, requestStream, call.getHeaders(), responseObserver, context);
        return new StreamingServerRequestObserver(streamIterator, messageQueue);
    }

    void onStreamInvoke(ServiceResource resource, StreamValue requestStream, HttpHeaders headers, StreamObserver responseObserver, ObserverContext context) {
        Object[] requestParams = this.computeResourceParams(resource, requestStream, headers, responseObserver);
        HashMap<String, ObserverContext> properties = new HashMap<String, ObserverContext>();
        if (ObserveUtils.isObservabilityEnabled()) {
            properties.put("__observer_context__", context);
        }
        StreamingCallableUnitCallBack callback = new StreamingCallableUnitCallBack(responseObserver, context);
        resource.getRuntime().invokeMethodAsync(resource.getService(), resource.getFunctionName(), null, GrpcConstants.ON_MESSAGE_METADATA, (CallableUnitCallback)callback, properties, requestParams);
    }

    private static final class StreamingServerCallListener
    implements ServerCallHandler.Listener {
        private final StreamObserver requestObserver;
        private final ServerCallHandler.ServerCallStreamObserver responseObserver;
        private boolean halfClosed = false;

        StreamingServerCallListener(StreamObserver requestObserver, ServerCallHandler.ServerCallStreamObserver responseObserver) {
            this.requestObserver = requestObserver;
            this.responseObserver = responseObserver;
        }

        @Override
        public void onMessage(Message request) {
            this.requestObserver.onNext(request);
        }

        @Override
        public void onHalfClose() {
            if (!this.halfClosed) {
                this.halfClosed = true;
                this.requestObserver.onCompleted();
            }
        }

        @Override
        public void onCancel(Message message) {
            this.responseObserver.cancelled = true;
            this.requestObserver.onError(message);
        }

        @Override
        public void onComplete() {
            if (!this.halfClosed) {
                this.halfClosed = true;
                this.requestObserver.onCompleted();
            }
        }
    }

    private static final class StreamingServerRequestObserver
    implements StreamObserver {
        private final BlockingQueue<Message> messageQueue;

        StreamingServerRequestObserver(ObjectValue streamIterator, BlockingQueue<Message> messageQueue) {
            this.messageQueue = messageQueue;
        }

        @Override
        public void onNext(Message value) {
            this.messageQueue.add(value);
        }

        @Override
        public void onError(Message error) {
            this.messageQueue.add(error);
        }

        @Override
        public void onCompleted() {
            this.messageQueue.add(new Message("completedMessage", null));
        }
    }
}

