package io.ballerina.messaging.broker.amqp.codec.handlers;

import io.ballerina.messaging.broker.amqp.AmqpConnectionManager;
import io.ballerina.messaging.broker.amqp.codec.AmqConstant;
import io.ballerina.messaging.broker.amqp.codec.AmqpChannel;
import io.ballerina.messaging.broker.amqp.codec.AmqpChannelFactory;
import io.ballerina.messaging.broker.amqp.codec.AmqpChannelView;
import io.ballerina.messaging.broker.amqp.codec.AmqpChannelWrapper;
import io.ballerina.messaging.broker.amqp.codec.ConnectionException;
import io.ballerina.messaging.broker.amqp.codec.frames.AmqpBadMessage;
import io.ballerina.messaging.broker.amqp.codec.frames.ChannelClose;
import io.ballerina.messaging.broker.amqp.codec.frames.ConnectionClose;
import io.ballerina.messaging.broker.amqp.codec.frames.ConnectionStart;
import io.ballerina.messaging.broker.amqp.codec.frames.GeneralFrame;
import io.ballerina.messaging.broker.amqp.codec.frames.ProtocolInitFrame;
import io.ballerina.messaging.broker.amqp.metrics.AmqpMetricManager;
import io.ballerina.messaging.broker.common.data.types.ShortString;
import io.ballerina.messaging.broker.core.Broker;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.net.SocketAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/ballerina/messaging/broker/amqp/codec/handlers/AmqpConnectionHandler.class */
public class AmqpConnectionHandler extends ChannelInboundHandlerAdapter {
    private final Map<Integer, AmqpChannel> channels = new HashMap();
    private final Map<Integer, AmqpChannelView> channelViews = new LinkedHashMap();
    private Broker broker;
    private final AmqpMetricManager metricManager;
    private ChannelHandlerContext ctx;
    private final int id;
    private final long connectedTime;
    private String remoteAddress;
    private final AmqpConnectionManager connectionManager;
    private AmqpChannelFactory amqpChannelFactory;
    private Channel nettyChannel;
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpConnectionHandler.class);
    private static final AtomicInteger ID_GENERATOR = new AtomicInteger(0);

    public AmqpConnectionHandler(AmqpMetricManager amqpMetricManager, AmqpChannelFactory amqpChannelFactory, AmqpConnectionManager amqpConnectionManager) {
        this.metricManager = amqpMetricManager;
        this.amqpChannelFactory = amqpChannelFactory;
        this.connectionManager = amqpConnectionManager;
        amqpMetricManager.incrementConnectionCount();
        this.connectedTime = System.currentTimeMillis();
        this.id = ID_GENERATOR.incrementAndGet();
    }

    public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
        this.nettyChannel = channelHandlerContext.channel();
        this.ctx = channelHandlerContext;
        this.nettyChannel.closeFuture().addListener(future -> {
            channelHandlerContext.fireChannelRead(this::onConnectionClose);
        });
        this.remoteAddress = channelHandlerContext.channel().remoteAddress().toString();
        channelHandlerContext.fireChannelRead(() -> {
            this.connectionManager.addConnectionHandler(this);
        });
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (obj instanceof ProtocolInitFrame) {
            handleProtocolInit(channelHandlerContext, (ProtocolInitFrame) obj);
            return;
        }
        if (obj instanceof GeneralFrame) {
            ((GeneralFrame) obj).handle(channelHandlerContext, this);
        } else if (obj instanceof AmqpBadMessage) {
            LOGGER.warn("Bad message received", ((AmqpBadMessage) obj).getCause());
            channelHandlerContext.close();
        }
    }

    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) {
        if (channelHandlerContext.channel().isWritable()) {
            return;
        }
        channelHandlerContext.channel().config().setAutoRead(false);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Auto read set to false in channel {}", getRemoteAddress(channelHandlerContext));
        }
    }

    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) {
        if (channelHandlerContext.channel().isWritable()) {
            channelHandlerContext.channel().config().setAutoRead(true);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Auto read set to true in channel {}", getRemoteAddress(channelHandlerContext));
            }
        }
    }

    private SocketAddress getRemoteAddress(ChannelHandlerContext channelHandlerContext) {
        return channelHandlerContext.channel().remoteAddress();
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        LOGGER.warn("Exception while handling request", th);
        channelHandlerContext.close();
    }

    private void onConnectionClose() {
        closeAllChannels();
        this.metricManager.decrementConnectionCount();
        this.connectionManager.removeConnectionHandler(this);
    }

    private void handleProtocolInit(ChannelHandlerContext channelHandlerContext, ProtocolInitFrame protocolInitFrame) {
        if (ProtocolInitFrame.V_091.equals(protocolInitFrame)) {
            channelHandlerContext.writeAndFlush(ConnectionStart.DEFAULT_FRAME);
        } else {
            channelHandlerContext.writeAndFlush(ProtocolInitFrame.V_091);
        }
    }

    public void createChannel(int i) throws ConnectionException {
        if (this.channels.get(Integer.valueOf(i)) != null) {
            throw new ConnectionException(ConnectionException.CHANNEL_ERROR, "Channel ID " + i + " Already exists");
        }
        AmqpChannel createChannel = this.amqpChannelFactory.createChannel(this.broker, i, this);
        this.channels.put(Integer.valueOf(i), createChannel);
        this.channelViews.put(Integer.valueOf(i), new AmqpChannelWrapper(createChannel));
        this.metricManager.incrementChannelCount();
    }

    public AmqpChannel getChannel(int i) {
        return this.channels.get(Integer.valueOf(i));
    }

    public void closeChannel(int i) {
        AmqpChannel remove = this.channels.remove(Integer.valueOf(i));
        if (Objects.nonNull(remove)) {
            closeChannel(remove);
        }
    }

    private void closeChannel(AmqpChannel amqpChannel) {
        this.metricManager.decrementChannelCount();
        amqpChannel.close();
    }

    public void closeAllChannels() {
        Iterator<AmqpChannel> it = this.channels.values().iterator();
        while (it.hasNext()) {
            closeChannel(it.next());
        }
        this.channels.clear();
    }

    public Broker getBroker() {
        return this.broker;
    }

    public void attachBroker(Broker broker) {
        this.broker = broker;
    }

    public boolean isWritable() {
        return this.nettyChannel.isWritable();
    }

    public String getRemoteAddress() {
        return this.remoteAddress;
    }

    public int getChannelCount() {
        return this.channels.size();
    }

    public long getConnectedTime() {
        return this.connectedTime;
    }

    public int getId() {
        return this.id;
    }

    public int closeConnection(String str) {
        LOGGER.info("Closing connection {}. Reason: {}", Integer.valueOf(getId()), str);
        int size = this.channels.size();
        this.ctx.writeAndFlush(new ConnectionClose(AmqConstant.CONNECTION_FORCED, ShortString.parseString("Broker forced close connection. " + str), 0, 0));
        return size;
    }

    public int forceCloseConnection(String str) {
        LOGGER.info("Force closing connection {}. Reason: {}", Integer.valueOf(getId()), str);
        int size = this.channels.size();
        this.ctx.close().addListener(future -> {
            if (future.isSuccess()) {
                LOGGER.info("Connection {} forcefully closed successfully.", Integer.valueOf(getId()));
            } else {
                LOGGER.error("Error occurred while closing connection {}", Integer.valueOf(getId()), future.cause());
            }
        });
        return size;
    }

    public void forceDisconnectChannel(int i, String str) {
        LOGGER.info("Force closing channel {} of connection {}. Reason: {}", new Object[]{Integer.valueOf(i), Integer.valueOf(getId()), str});
        this.ctx.writeAndFlush(new ChannelClose(i, AmqConstant.CHANNEL_CLOSED, ShortString.parseString("Broker forced close channel. " + str), 0, 0));
    }

    public Collection<AmqpChannelView> getChannelViews() {
        return this.channelViews.values();
    }
}
