/*
 * Decompiled with CFR 0.152.
 */
package io.kroxylicious.proxy.internal;

import io.kroxylicious.proxy.internal.KafkaProxyFrontendHandler;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaProxyBackendHandler
extends ChannelInboundHandlerAdapter {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProxyBackendHandler.class);
    private final KafkaProxyFrontendHandler frontendHandler;
    private final ChannelHandlerContext inboundCtx;
    private ChannelHandlerContext blockedOutboundCtx;
    private boolean unflushedWrites;

    public KafkaProxyBackendHandler(KafkaProxyFrontendHandler frontendHandler, ChannelHandlerContext inboundCtx) {
        this.frontendHandler = frontendHandler;
        this.inboundCtx = Objects.requireNonNull(inboundCtx);
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        super.channelWritabilityChanged(ctx);
        this.frontendHandler.outboundWritabilityChanged(ctx);
    }

    public void inboundChannelWritabilityChanged(ChannelHandlerContext inboundCtx) {
        assert (inboundCtx == this.inboundCtx);
        ChannelHandlerContext outboundCtx = this.blockedOutboundCtx;
        if (outboundCtx != null && inboundCtx.channel().isWritable()) {
            this.blockedOutboundCtx = null;
            outboundCtx.channel().config().setAutoRead(true);
        }
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LOGGER.trace("Channel active {}", (Object)ctx);
        super.channelActive(ctx);
        this.frontendHandler.outboundChannelActive(ctx);
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        assert (this.blockedOutboundCtx == null);
        LOGGER.trace("Channel read {}", msg);
        Channel inboundChannel = this.inboundCtx.channel();
        if (inboundChannel.isWritable()) {
            inboundChannel.write(msg, this.inboundCtx.voidPromise());
            this.unflushedWrites = true;
        } else {
            inboundChannel.writeAndFlush(msg, this.inboundCtx.voidPromise());
            this.unflushedWrites = false;
        }
    }

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
        Channel inboundChannel = this.inboundCtx.channel();
        if (this.unflushedWrites) {
            this.unflushedWrites = false;
            inboundChannel.flush();
        }
        if (!inboundChannel.isWritable()) {
            ctx.channel().config().setAutoRead(false);
            this.blockedOutboundCtx = ctx;
        }
    }

    public void channelInactive(ChannelHandlerContext ctx) {
        KafkaProxyFrontendHandler.closeOnFlush(this.inboundCtx.channel());
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        LOGGER.warn("Netty caught exception from the backend: {}", (Object)cause.getMessage(), (Object)cause);
        KafkaProxyFrontendHandler.closeOnFlush(ctx.channel());
    }
}

