/*
 * Decompiled with CFR 0.152.
 */
package org.ballerinalang.net.grpc;

import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.LastHttpContent;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.ballerinalang.jvm.runtime.BLangThreadFactory;
import org.ballerinalang.net.grpc.ClientCall;
import org.ballerinalang.net.grpc.Codec;
import org.ballerinalang.net.grpc.Decompressor;
import org.ballerinalang.net.grpc.DecompressorRegistry;
import org.ballerinalang.net.grpc.InboundMessage;
import org.ballerinalang.net.grpc.MessageUtils;
import org.ballerinalang.net.grpc.Status;
import org.wso2.transport.http.netty.contract.HttpClientConnectorListener;
import org.wso2.transport.http.netty.message.HttpCarbonMessage;

public class ClientConnectorListener
implements HttpClientConnectorListener {
    private Status transportError;
    private HttpHeaders transportErrorMetadata;
    private boolean headersReceived;
    private ClientInboundStateListener stateListener;
    private ExecutorService workerExecutor = Executors.newFixedThreadPool(10, (ThreadFactory)new BLangThreadFactory(new ThreadGroup("grpc-worker"), "grpc-worker-thread-pool"));

    ClientConnectorListener(ClientCall.ClientStreamListener streamListener) {
        this.stateListener = new ClientInboundStateListener(0x400000, streamListener);
    }

    public final void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
        this.stateListener.setDecompressorRegistry(decompressorRegistry);
    }

    public void onMessage(HttpCarbonMessage httpMessage) {
        InboundMessage inboundMessage = new InboundMessage(httpMessage);
        if (this.isValid(inboundMessage)) {
            this.stateListener.inboundHeadersReceived(inboundMessage.getHeaders());
        }
        this.workerExecutor.execute(() -> {
            try {
                HttpContent httpContent = inboundMessage.getHttpCarbonMessage().getHttpContent();
                while (httpContent != null) {
                    if (this.transportError != null) {
                        this.transportError = this.transportError.augmentDescription("MESSAGE DATA: " + MessageUtils.readAsString(httpContent, Charset.forName("UTF-8")));
                        httpContent.release();
                        if (this.transportError.getDescription() != null && this.transportError.getDescription().length() > 1000 || httpContent instanceof LastHttpContent) {
                            this.stateListener.transportReportStatus(this.transportError, false, this.transportErrorMetadata);
                            break;
                        }
                    } else {
                        this.stateListener.inboundDataReceived(httpContent);
                        if (httpContent instanceof LastHttpContent) {
                            LastHttpContent lastHttpContent = (LastHttpContent)httpContent;
                            if (lastHttpContent.decoderResult() != null && lastHttpContent.decoderResult().isFailure()) {
                                this.transportError = Status.Code.ABORTED.toStatus().withDescription(lastHttpContent.decoderResult().cause().getMessage());
                            } else if (lastHttpContent.trailingHeaders().isEmpty()) {
                                this.transportError = Status.Code.INTERNAL.toStatus().withDescription("Received unexpected EOS on DATA frame from server.");
                                this.transportErrorMetadata = new DefaultHttpHeaders();
                                this.stateListener.transportReportStatus(this.transportError, false, this.transportErrorMetadata);
                            } else {
                                this.transportTrailersReceived(lastHttpContent.trailingHeaders());
                            }
                            break;
                        }
                    }
                    httpContent = inboundMessage.getHttpCarbonMessage().getHttpContent();
                }
            }
            catch (RuntimeException e) {
                this.transportError = this.transportError != null ? this.transportError.augmentDescription(e.getMessage()) : Status.fromThrowable(e);
                this.stateListener.transportReportStatus(this.transportError, false, this.transportErrorMetadata);
            }
        });
    }

    public void onError(Throwable throwable) {
        this.transportError = this.transportError != null ? this.transportError.augmentDescription(throwable.getMessage()) : Status.Code.UNAVAILABLE.toStatus().withCause(throwable);
        this.stateListener.transportReportStatus(this.transportError, false, this.transportErrorMetadata);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean isValid(InboundMessage inboundMessage) {
        HttpHeaders headers = inboundMessage.getHeaders();
        if (headers == null) {
            this.transportError = Status.Code.INTERNAL.toStatus().withDescription("Message headers is null");
            return false;
        }
        if (this.transportError != null) {
            this.transportError = this.transportError.augmentDescription("headers: " + headers);
            return false;
        }
        try {
            if (this.headersReceived) {
                this.transportError = Status.Code.INTERNAL.toStatus().withDescription("Received headers twice");
                boolean bl = false;
                return bl;
            }
            int httpStatus = inboundMessage.getStatus();
            if (httpStatus >= 100 && httpStatus < 200) {
                boolean bl = false;
                return bl;
            }
            this.headersReceived = true;
            this.transportError = this.validateInitialMetadata(inboundMessage);
            boolean bl = this.transportError == null;
            return bl;
        }
        finally {
            if (this.transportError != null) {
                this.transportErrorMetadata = headers;
            }
        }
    }

    private Status validateInitialMetadata(InboundMessage inboundMessage) {
        String contentType = inboundMessage.getHeaders().get("content-type");
        if (!MessageUtils.isGrpcContentType(contentType)) {
            return MessageUtils.httpStatusToGrpcStatus(inboundMessage.getStatus()).augmentDescription("invalid content-type: " + contentType);
        }
        return null;
    }

    private void transportTrailersReceived(HttpHeaders trailers) {
        if (this.transportError != null) {
            this.transportError = this.transportError.augmentDescription("trailers: " + trailers);
            this.stateListener.transportReportStatus(this.transportError, false, this.transportErrorMetadata);
        } else {
            Status status = this.statusFromTrailers(trailers);
            this.stateListener.transportReportStatus(status, false, trailers);
        }
    }

    private Status statusFromTrailers(HttpHeaders trailers) {
        String statusString = trailers.get("grpc-status");
        Status status = null;
        if (statusString != null) {
            status = Status.CODE_MARSHALLER.parseAsciiString(statusString.getBytes(Charset.forName("US-ASCII")));
        }
        if (status != null) {
            return status.withDescription(trailers.get("grpc-message"));
        }
        return Status.Code.UNKNOWN.toStatus().withDescription("missing GRPC status in response");
    }

    private static class ClientInboundStateListener
    extends InboundMessage.InboundStateListener {
        final ClientCall.ClientStreamListener listener;
        private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
        private boolean deframerClosed = false;
        private Runnable deframerClosedTask;
        private boolean statusReported;
        private boolean listenerClosed;

        ClientInboundStateListener(int maxMessageSize, ClientCall.ClientStreamListener listener) {
            super(maxMessageSize);
            this.listener = listener;
        }

        private void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
            this.decompressorRegistry = decompressorRegistry;
        }

        @Override
        protected ClientCall.ClientStreamListener listener() {
            return this.listener;
        }

        @Override
        public void deframerClosed(boolean hasPartialMessage) {
            this.deframerClosed = true;
            if (this.deframerClosedTask != null) {
                this.deframerClosedTask.run();
                this.deframerClosedTask = null;
            }
        }

        void inboundHeadersReceived(HttpHeaders headers) {
            String streamEncoding = headers.get("content-encoding");
            if (streamEncoding != null) {
                this.deframeFailed(Status.Code.INTERNAL.toStatus().withDescription(String.format("Full stream decompressor for %s is not supported", streamEncoding)).asRuntimeException());
                return;
            }
            String messageEncoding = headers.get("grpc-encoding");
            if (messageEncoding != null) {
                Decompressor decompressor = this.decompressorRegistry.lookupDecompressor(messageEncoding);
                if (decompressor == null) {
                    this.deframeFailed(Status.Code.INTERNAL.toStatus().withDescription(String.format("Can't find decompressor for %s", messageEncoding)).asRuntimeException());
                    return;
                }
                if (decompressor != Codec.Identity.NONE) {
                    this.setDecompressor(decompressor);
                }
            }
            this.listener().headersRead(headers);
        }

        void inboundDataReceived(HttpContent httpContent) {
            this.deframe(httpContent);
        }

        final void transportReportStatus(Status status, boolean stopDelivery, HttpHeaders trailers) {
            if (this.statusReported && !stopDelivery) {
                return;
            }
            this.statusReported = true;
            if (this.deframerClosed) {
                this.deframerClosedTask = null;
                this.closeListener(status, trailers);
            } else {
                this.deframerClosedTask = () -> this.closeListener(status, trailers);
                this.closeDeframer(stopDelivery);
            }
        }

        private void closeListener(Status status, HttpHeaders trailers) {
            if (!this.listenerClosed) {
                this.listenerClosed = true;
                this.listener().closed(status, trailers);
            }
        }

        @Override
        public void deframeFailed(Throwable cause) {
            this.transportReportStatus(Status.fromThrowable(cause), true, (HttpHeaders)new DefaultHttpHeaders());
        }
    }
}

