/*
 * Decompiled with CFR 0.152.
 */
package com.uber.tchannel.handlers;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.uber.tchannel.api.SubChannel;
import com.uber.tchannel.api.TChannel;
import com.uber.tchannel.api.handlers.RequestHandler;
import com.uber.tchannel.errors.ErrorType;
import com.uber.tchannel.frames.ErrorFrame;
import com.uber.tchannel.headers.ArgScheme;
import com.uber.tchannel.messages.Request;
import com.uber.tchannel.messages.Response;
import com.uber.tchannel.messages.ResponseMessage;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RequestRouter
extends SimpleChannelInboundHandler<Request> {
    private static final Logger logger = LoggerFactory.getLogger(RequestRouter.class);
    private final TChannel topChannel;
    private final ListeningExecutorService listeningExecutorService;
    private final AtomicBoolean busy = new AtomicBoolean(false);
    private final ConcurrentLinkedQueue<Response> responseQueue = new ConcurrentLinkedQueue();

    public RequestRouter(TChannel topChannel, ExecutorService executorService) {
        this.topChannel = topChannel;
        this.listeningExecutorService = MoreExecutors.listeningDecorator(executorService);
    }

    private RequestHandler getRequestHandler(String service, String endpoint) {
        SubChannel subChannel = this.topChannel.getSubChannel(service);
        RequestHandler handler = null;
        if (subChannel != null) {
            handler = subChannel.getRequestHandler(endpoint);
        }
        return handler;
    }

    @Override
    protected void messageReceived(final ChannelHandlerContext ctx, final Request request) {
        if (!ctx.channel().isActive()) {
            logger.warn("drop request when channel is inActive");
            request.release();
            return;
        }
        ArgScheme argScheme = ArgScheme.toScheme(request.getTransportHeaders().get("as"));
        if (argScheme == null) {
            ErrorFrame.sendError(ErrorType.BadRequest, "Expected incoming call to have \"as\" header set", request, ctx);
            return;
        }
        String service = request.getService();
        if (service == null || service.isEmpty()) {
            ErrorFrame.sendError(ErrorType.BadRequest, "Expected incoming call to have serviceName", request, ctx);
            return;
        }
        String endpoint = request.getArg1().toString(CharsetUtil.UTF_8);
        if (endpoint == null || endpoint.isEmpty()) {
            ErrorFrame.sendError(ErrorType.BadRequest, "Expected incoming call to have endpoint", request, ctx);
            return;
        }
        RequestHandler handler = this.getRequestHandler(service, endpoint);
        if (handler == null) {
            handler = this.topChannel.getDefaultUserHandler();
        }
        if (handler == null) {
            ErrorFrame.sendError(ErrorType.BadRequest, "Invalid handler function", request, ctx);
            return;
        }
        ListenableFuture<Response> responseFuture = this.listeningExecutorService.submit(new CallableHandler(handler, request));
        Futures.addCallback(responseFuture, new FutureCallback<Response>(){

            @Override
            public void onSuccess(Response response) {
                if (!ctx.channel().isActive()) {
                    response.release();
                    return;
                }
                RequestRouter.this.responseQueue.offer(response);
                RequestRouter.this.sendResponse(ctx);
            }

            @Override
            public void onFailure(Throwable throwable) {
                logger.error("Failed to handle the request due to exception.", throwable);
                ErrorFrame.sendError(ErrorType.UnexpectedError, "Failed to handle the request: " + throwable.getMessage(), request, ctx);
            }
        });
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) {
        this.sendResponse(ctx);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void sendResponse(ChannelHandlerContext ctx) {
        if (!this.busy.compareAndSet(false, true)) {
            return;
        }
        Channel channel = ctx.channel();
        try {
            Response res;
            boolean flush = false;
            while (channel.isWritable() && (res = this.responseQueue.poll()) != null) {
                channel.write(res);
                flush = true;
            }
            if (flush) {
                channel.flush();
            }
        }
        finally {
            this.busy.set(false);
        }
        if (channel.isWritable() && !this.responseQueue.isEmpty()) {
            this.sendResponse(ctx);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        while (!this.responseQueue.isEmpty()) {
            ResponseMessage res = this.responseQueue.poll();
            res.release();
        }
    }

    private class CallableHandler
    implements Callable<Response> {
        private final Request request;
        private final RequestHandler handler;

        public CallableHandler(RequestHandler handler, Request request) {
            this.handler = handler;
            this.request = request;
        }

        @Override
        public Response call() throws Exception {
            Response response = this.handler.handle(this.request);
            this.request.release();
            return response;
        }
    }
}

