package io.kroxylicious.proxy.internal;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kroxylicious/proxy/internal/KafkaProxyBackendHandler.class */
public class KafkaProxyBackendHandler extends ChannelInboundHandlerAdapter {
    private static final Logger LOGGER;
    private final KafkaProxyFrontendHandler frontendHandler;
    private final ChannelHandlerContext inboundCtx;
    private ChannelHandlerContext blockedOutboundCtx;
    private boolean unflushedWrites;
    static final /* synthetic */ boolean $assertionsDisabled;

    public KafkaProxyBackendHandler(KafkaProxyFrontendHandler kafkaProxyFrontendHandler, ChannelHandlerContext channelHandlerContext) {
        this.frontendHandler = kafkaProxyFrontendHandler;
        this.inboundCtx = (ChannelHandlerContext) Objects.requireNonNull(channelHandlerContext);
    }

    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelWritabilityChanged(channelHandlerContext);
        this.frontendHandler.outboundWritabilityChanged(channelHandlerContext);
    }

    public void inboundChannelWritabilityChanged(ChannelHandlerContext channelHandlerContext) {
        if (!$assertionsDisabled && channelHandlerContext != this.inboundCtx) {
            throw new AssertionError();
        }
        ChannelHandlerContext channelHandlerContext2 = this.blockedOutboundCtx;
        if (channelHandlerContext2 == null || !channelHandlerContext.channel().isWritable()) {
            return;
        }
        this.blockedOutboundCtx = null;
        channelHandlerContext2.channel().config().setAutoRead(true);
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        LOGGER.trace("Channel active {}", channelHandlerContext);
        super.channelActive(channelHandlerContext);
        this.frontendHandler.outboundChannelActive(channelHandlerContext);
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
        if (!$assertionsDisabled && this.blockedOutboundCtx != null) {
            throw new AssertionError();
        }
        LOGGER.trace("Channel read {}", obj);
        Channel channel = this.inboundCtx.channel();
        if (channel.isWritable()) {
            channel.write(obj, this.inboundCtx.voidPromise());
            this.unflushedWrites = true;
        } else {
            channel.writeAndFlush(obj, this.inboundCtx.voidPromise());
            this.unflushedWrites = false;
        }
    }

    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelReadComplete(channelHandlerContext);
        Channel channel = this.inboundCtx.channel();
        if (this.unflushedWrites) {
            this.unflushedWrites = false;
            channel.flush();
        }
        if (channel.isWritable()) {
            return;
        }
        channelHandlerContext.channel().config().setAutoRead(false);
        this.blockedOutboundCtx = channelHandlerContext;
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) {
        KafkaProxyFrontendHandler.closeOnFlush(this.inboundCtx.channel());
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        LOGGER.warn("Netty caught exception from the backend: {}", th.getMessage(), th);
        KafkaProxyFrontendHandler.closeOnFlush(channelHandlerContext.channel());
    }

    static {
        $assertionsDisabled = !KafkaProxyBackendHandler.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(KafkaProxyBackendHandler.class);
    }
}
