/*
 * Decompiled with CFR 0.152.
 */
package org.ballerinalang.net.grpc.listener;

import com.google.protobuf.Descriptors;
import io.grpc.stub.ServerCalls;
import io.grpc.stub.StreamObserver;
import java.util.Map;
import org.ballerinalang.bre.bvm.CallableUnitCallback;
import org.ballerinalang.connector.api.Executor;
import org.ballerinalang.connector.api.Resource;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.net.grpc.GrpcCallableUnitCallBack;
import org.ballerinalang.net.grpc.Message;
import org.ballerinalang.net.grpc.listener.MethodListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientStreamingListener
extends MethodListener
implements ServerCalls.ClientStreamingMethod<Message, Message> {
    private final Map<String, Resource> resourceMap;
    private static final Logger LOG = LoggerFactory.getLogger(ClientStreamingListener.class);

    public ClientStreamingListener(Descriptors.MethodDescriptor methodDescriptor, Map<String, Resource> resourceMap) {
        super(methodDescriptor);
        this.resourceMap = resourceMap;
    }

    public StreamObserver<Message> invoke(final StreamObserver<Message> responseObserver) {
        Resource onOpen = this.resourceMap.get("onOpen");
        GrpcCallableUnitCallBack callback = new GrpcCallableUnitCallBack(responseObserver, Boolean.FALSE);
        Executor.submit((Resource)onOpen, (CallableUnitCallback)callback, null, null, (BValue[])this.computeMessageParams(onOpen, null, responseObserver));
        return new StreamObserver<Message>(){

            public void onNext(Message value) {
                Resource onMessage = (Resource)ClientStreamingListener.this.resourceMap.get("onMessage");
                GrpcCallableUnitCallBack callback = new GrpcCallableUnitCallBack((StreamObserver<Message>)responseObserver, Boolean.FALSE);
                Executor.submit((Resource)onMessage, (CallableUnitCallback)callback, null, null, (BValue[])ClientStreamingListener.this.computeMessageParams(onMessage, value, (StreamObserver<Message>)responseObserver));
            }

            public void onError(Throwable t) {
                Resource onError = (Resource)ClientStreamingListener.this.resourceMap.get("onError");
                ClientStreamingListener.this.onErrorInvoke(onError, (StreamObserver<Message>)responseObserver, t);
            }

            public void onCompleted() {
                Resource onCompleted = (Resource)ClientStreamingListener.this.resourceMap.get("onComplete");
                if (onCompleted == null) {
                    String message = "Error in listener service definition. onError resource does not exists";
                    LOG.error(message);
                    throw new RuntimeException(message);
                }
                GrpcCallableUnitCallBack callback = new GrpcCallableUnitCallBack((StreamObserver<Message>)responseObserver, ClientStreamingListener.this.isEmptyResponse());
                Executor.submit((Resource)onCompleted, (CallableUnitCallback)callback, null, null, (BValue[])ClientStreamingListener.this.computeMessageParams(onCompleted, null, (StreamObserver<Message>)responseObserver));
            }
        };
    }
}

