package io.ballerina.messaging.broker.amqp.codec;

import io.ballerina.messaging.broker.amqp.AmqpServerConfiguration;
import io.ballerina.messaging.broker.amqp.codec.flow.ChannelFlowManager;
import io.ballerina.messaging.broker.amqp.codec.handlers.AmqpConnectionHandler;
import io.ballerina.messaging.broker.amqp.consumer.AckData;
import io.ballerina.messaging.broker.amqp.consumer.AmqpConsumer;
import io.ballerina.messaging.broker.amqp.consumer.AmqpDeliverMessage;
import io.ballerina.messaging.broker.amqp.metrics.AmqpMetricManager;
import io.ballerina.messaging.broker.common.ResourceNotFoundException;
import io.ballerina.messaging.broker.common.ValidationException;
import io.ballerina.messaging.broker.common.data.types.FieldTable;
import io.ballerina.messaging.broker.common.data.types.ShortString;
import io.ballerina.messaging.broker.core.Broker;
import io.ballerina.messaging.broker.core.BrokerException;
import io.ballerina.messaging.broker.core.Consumer;
import io.ballerina.messaging.broker.core.Message;
import io.ballerina.messaging.broker.core.transaction.AutoCommitTransaction;
import io.ballerina.messaging.broker.core.transaction.BrokerTransaction;
import io.ballerina.messaging.broker.core.transaction.DistributedTransaction;
import io.ballerina.messaging.broker.core.transaction.LocalTransaction;
import io.ballerina.messaging.broker.core.util.MessageTracer;
import io.ballerina.messaging.broker.core.util.TraceField;
import io.netty.channel.ChannelHandlerContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.transaction.xa.Xid;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/ballerina/messaging/broker/amqp/codec/AmqpChannel.class */
public class AmqpChannel implements AmqpChannelView {
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpChannel.class);
    private static final String ACKNOWLEDGE_RECEIVED = "Acknowledgement received from AMQP transport.";
    private static final String UNKNOWN_ACKNOWLEDGEMENT = "Matching message for acknowledgment not found.";
    private static final String REJECT_RECEIVED = "Message reject received from AMQP transport.";
    private static final String UNKNOWN_REJECT = "Matching message for reject not found.";
    public static final String DELIVERY_TAG_FIELD_NAME = "deliveryTag";
    public static final String CHANNEL_ID_FIELD_NAME = "channelId";
    private static final String REQUEUE_FLAG_FIELD_NAME = "requeueFlag";
    private final Broker broker;
    private final int channelId;
    private final AmqpMetricManager metricManager;
    private AmqpConnectionHandler connection;
    private final InMemoryMessageAggregator messageAggregator;
    private final ChannelFlowManager flowManager;
    private final int maxRedeliveryCount;
    private final TraceField traceChannelIdField;
    private BrokerTransaction transaction;
    private int prefetchCount;
    private AtomicBoolean closed = new AtomicBoolean(false);
    private AtomicInteger consumerTagGenerator = new AtomicInteger(0);
    private AtomicLong deliveryTagGenerator = new AtomicLong(0);
    private UnackedMessageMap unackedMessageMap = new UnackedMessageMap();
    private AtomicBoolean flow = new AtomicBoolean(true);
    private AtomicBoolean hasRoom = new AtomicBoolean(true);
    private List<AmqpDeliverMessage> deliveryPendingMessages = new ArrayList();
    private final Map<ShortString, AmqpConsumer> consumerMap = new HashMap();
    private long createdTime = System.currentTimeMillis();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/ballerina/messaging/broker/amqp/codec/AmqpChannel$UnackedMessageMap.class */
    public class UnackedMessageMap {
        private final Map<Long, AckData> pendingAcknowledgments;
        private final Map<Long, AckData> markedAcknowledgments;

        private UnackedMessageMap() {
            this.pendingAcknowledgments = new LinkedHashMap();
            this.markedAcknowledgments = new LinkedHashMap();
        }

        AckData markAcknowledgement(long j) {
            AckData remove = this.pendingAcknowledgments.remove(Long.valueOf(j));
            if (Objects.nonNull(remove)) {
                this.markedAcknowledgments.put(Long.valueOf(j), remove);
            }
            return remove;
        }

        AckData negativeAcknowledge(long j) {
            AckData remove = this.pendingAcknowledgments.remove(Long.valueOf(j));
            checkAndEnableHasRoom();
            return remove;
        }

        void put(long j, AckData ackData) {
            this.pendingAcknowledgments.put(Long.valueOf(j), ackData);
            checkAndDisableHasRoom();
        }

        Collection<AckData> removeAll() {
            ArrayList arrayList = new ArrayList(this.pendingAcknowledgments.values());
            this.pendingAcknowledgments.clear();
            arrayList.addAll(this.markedAcknowledgments.values());
            this.markedAcknowledgments.clear();
            AmqpChannel.this.hasRoom.set(true);
            return arrayList;
        }

        AckData removeMarkedAcknowledgment(long j) {
            AckData remove = this.markedAcknowledgments.remove(Long.valueOf(j));
            checkAndEnableHasRoom();
            return remove;
        }

        private void checkAndEnableHasRoom() {
            if (AmqpChannel.this.hasRoom.get() || this.pendingAcknowledgments.size() >= AmqpChannel.this.prefetchCount) {
                return;
            }
            AmqpChannel.this.hasRoom.set(true);
        }

        void resetMarkedAcknowledgments() {
            this.pendingAcknowledgments.putAll(this.markedAcknowledgments);
            this.markedAcknowledgments.clear();
            checkAndDisableHasRoom();
        }

        Collection<AckData> removeMarkedAcknowledgments() {
            ArrayList arrayList = new ArrayList(this.markedAcknowledgments.values());
            this.markedAcknowledgments.clear();
            checkAndEnableHasRoom();
            return arrayList;
        }

        private void checkAndDisableHasRoom() {
            if (!AmqpChannel.this.hasRoom.get() || this.pendingAcknowledgments.size() < AmqpChannel.this.prefetchCount) {
                return;
            }
            AmqpChannel.this.hasRoom.set(false);
        }

        void releaseAndRemoveMessages(String str) {
            releaseAndRemoveMessagesFromMap(str, this.pendingAcknowledgments);
            releaseAndRemoveMessagesFromMap(str, this.markedAcknowledgments);
            checkAndEnableHasRoom();
        }

        private void releaseAndRemoveMessagesFromMap(String str, Map<Long, AckData> map) {
            Iterator<Map.Entry<Long, AckData>> it = map.entrySet().iterator();
            while (it.hasNext()) {
                AckData value = it.next().getValue();
                if (value.getQueueName().equals(str)) {
                    value.getMessage().release();
                    it.remove();
                }
            }
        }
    }

    public AmqpChannel(AmqpServerConfiguration amqpServerConfiguration, Broker broker, int i, AmqpMetricManager amqpMetricManager, AmqpConnectionHandler amqpConnectionHandler) {
        this.broker = broker;
        this.channelId = i;
        this.metricManager = amqpMetricManager;
        this.connection = amqpConnectionHandler;
        this.transaction = new AutoCommitTransaction(broker);
        this.messageAggregator = new InMemoryMessageAggregator(this.transaction);
        this.flowManager = new ChannelFlowManager(this, amqpServerConfiguration.getChannelFlow().getLowLimit(), amqpServerConfiguration.getChannelFlow().getHighLimit());
        this.maxRedeliveryCount = amqpServerConfiguration.getMaxRedeliveryCount();
        this.traceChannelIdField = new TraceField("channelId", Integer.valueOf(i));
    }

    public void declareExchange(String str, String str2, boolean z, boolean z2) throws BrokerException, ValidationException {
        this.broker.declareExchange(str, str2, z, z2);
    }

    public void deleteExchange(String str, boolean z) throws BrokerException, ValidationException, ResourceNotFoundException {
        this.broker.deleteExchange(str, z);
    }

    public void declareQueue(ShortString shortString, boolean z, boolean z2, boolean z3) throws BrokerException, ValidationException {
        this.broker.createQueue(shortString.toString(), z, z2, z3);
    }

    public void bind(ShortString shortString, ShortString shortString2, ShortString shortString3, FieldTable fieldTable) throws BrokerException, ValidationException {
        this.broker.bind(shortString.toString(), shortString2.toString(), shortString3.toString(), fieldTable);
    }

    public void unbind(ShortString shortString, ShortString shortString2, ShortString shortString3) throws BrokerException, ValidationException {
        this.broker.unbind(shortString.toString(), shortString2.toString(), shortString3.toString());
    }

    public AmqpConsumer consume(ShortString shortString, ShortString shortString2, boolean z, ChannelHandlerContext channelHandlerContext) throws BrokerException {
        ShortString shortString3 = shortString2;
        if (shortString3.isEmpty()) {
            shortString3 = ShortString.parseString("sgen" + getNextConsumerTag());
        }
        AmqpConsumer amqpConsumer = new AmqpConsumer(channelHandlerContext, this.broker, this, shortString.toString(), shortString3, z);
        this.consumerMap.put(shortString3, amqpConsumer);
        this.broker.addConsumer(amqpConsumer);
        this.metricManager.incrementConsumerCount();
        return amqpConsumer;
    }

    public void close() {
        this.closed.set(true);
        Iterator<AmqpConsumer> it = this.consumerMap.values().iterator();
        while (it.hasNext()) {
            closeConsumer(it.next());
        }
        this.transaction.onClose();
        this.consumerMap.clear();
        requeueUnackedMessages();
    }

    private void requeueUnackedMessages() {
        for (AckData ackData : this.unackedMessageMap.removeAll()) {
            Message message = ackData.getMessage();
            String queueName = ackData.getQueueName();
            try {
                this.broker.requeue(queueName, message);
            } catch (BrokerException e) {
                LOGGER.error("Error while requeueing message [{}] for queue ()", new Object[]{message, queueName, e});
            } catch (ResourceNotFoundException e2) {
                LOGGER.warn("Cannot requeue message [{}] since queue {} is not found", new Object[]{message, queueName, e2});
            }
        }
    }

    public void cancelConsumer(ShortString shortString) throws ChannelException {
        AmqpConsumer remove = this.consumerMap.remove(shortString);
        if (remove == null) {
            throw new ChannelException(ChannelException.NOT_FOUND, "Invalid Consumer tag [ " + shortString + " ] for the channel: " + this.channelId);
        }
        closeConsumer(remove);
    }

    private void closeConsumer(Consumer consumer) {
        try {
            if (this.broker.removeConsumer(consumer)) {
                this.unackedMessageMap.releaseAndRemoveMessages(consumer.getQueueName());
            }
        } finally {
            this.metricManager.decrementConsumerCount();
        }
    }

    public InMemoryMessageAggregator getMessageAggregator() {
        return this.messageAggregator;
    }

    public void acknowledge(long j, boolean z) throws BrokerException {
        AckData markAcknowledgement = this.unackedMessageMap.markAcknowledgement(j);
        if (MessageTracer.isTraceEnabled()) {
            MessageTracer.trace(Objects.nonNull(markAcknowledgement) ? ACKNOWLEDGE_RECEIVED : UNKNOWN_ACKNOWLEDGEMENT, new TraceField[]{this.traceChannelIdField, new TraceField(DELIVERY_TAG_FIELD_NAME, Long.valueOf(j))});
        }
        if (markAcknowledgement == null) {
            LOGGER.warn("Could not find a matching ack data for acking the delivery tag {}", Long.valueOf(j));
            return;
        }
        this.transaction.dequeue(markAcknowledgement.getQueueName(), markAcknowledgement.getMessage().getDetachableMessage());
        if (this.transaction.inTransactionBlock()) {
            return;
        }
        this.unackedMessageMap.removeMarkedAcknowledgment(j);
        markAcknowledgement.getMessage().release();
    }

    public int getNextConsumerTag() {
        return this.consumerTagGenerator.incrementAndGet();
    }

    public long getNextDeliveryTag() {
        return this.deliveryTagGenerator.incrementAndGet();
    }

    @Override // io.ballerina.messaging.broker.amqp.codec.AmqpChannelView
    public int getChannelId() {
        return this.channelId;
    }

    @Override // io.ballerina.messaging.broker.amqp.codec.AmqpChannelView
    public int getConsumerCount() {
        return this.consumerMap.size();
    }

    @Override // io.ballerina.messaging.broker.amqp.codec.AmqpChannelView
    public int getUnackedMessageCount() {
        return this.unackedMessageMap.pendingAcknowledgments.size();
    }

    @Override // io.ballerina.messaging.broker.amqp.codec.AmqpChannelView
    public int getDeliveryPendingMessageCount() {
        return this.deliveryPendingMessages.size();
    }

    @Override // io.ballerina.messaging.broker.amqp.codec.AmqpChannelView
    public String getTransactionType() {
        String str = "unidentified";
        if (this.transaction instanceof AutoCommitTransaction) {
            str = "AutoCommit";
        } else if (this.transaction instanceof LocalTransaction) {
            str = "LocalTransaction";
        } else if (this.transaction instanceof DistributedTransaction) {
            str = "DistributedTransaction";
        }
        return str;
    }

    @Override // io.ballerina.messaging.broker.amqp.codec.AmqpChannelView
    public int getPrefetchCount() {
        return this.prefetchCount;
    }

    @Override // io.ballerina.messaging.broker.amqp.codec.AmqpChannelView
    public long getCreatedTime() {
        return this.createdTime;
    }

    public void recordMessageDelivery(long j, AckData ackData) {
        this.unackedMessageMap.put(j, ackData);
    }

    public void reject(long j, boolean z) throws BrokerException, ResourceNotFoundException {
        this.metricManager.markReject();
        AckData negativeAcknowledge = this.unackedMessageMap.negativeAcknowledge(j);
        if (MessageTracer.isTraceEnabled()) {
            MessageTracer.trace(Objects.nonNull(negativeAcknowledge) ? REJECT_RECEIVED : UNKNOWN_REJECT, new TraceField[]{this.traceChannelIdField, new TraceField(DELIVERY_TAG_FIELD_NAME, Long.valueOf(j)), new TraceField(REQUEUE_FLAG_FIELD_NAME, Boolean.valueOf(z))});
        }
        if (negativeAcknowledge == null) {
            LOGGER.warn("Could not find a matching ack data for rejecting the delivery tag {}", Long.valueOf(j));
            return;
        }
        Message message = negativeAcknowledge.getMessage();
        if (z) {
            setRedeliverAndRequeue(message, negativeAcknowledge.getQueueName());
        } else {
            message.release();
            LOGGER.debug("Dropping message for delivery tag {}", Long.valueOf(j));
        }
    }

    private void setRedeliverAndRequeue(Message message, String str) throws BrokerException, ResourceNotFoundException {
        int redeliver = message.setRedeliver();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Redelivery count is {} for message {}", Integer.valueOf(redeliver), Long.valueOf(message.getInternalId()));
        }
        if (redeliver <= this.maxRedeliveryCount) {
            this.broker.requeue(str, message);
        } else {
            this.broker.moveToDlc(str, message);
        }
    }

    public Collection<AckData> recover() {
        return this.unackedMessageMap.removeAll();
    }

    public void requeueAll() throws BrokerException {
        for (AckData ackData : this.unackedMessageMap.removeAll()) {
            String queueName = ackData.getQueueName();
            Message message = ackData.getMessage();
            try {
                this.broker.requeue(queueName, message);
            } catch (ResourceNotFoundException e) {
                LOGGER.warn("Cannot requeue message [" + message + "] since queue [" + queueName + "] is not found", e);
            }
        }
    }

    public int getConnectionId() {
        return this.connection.getId();
    }

    public void setFlow(boolean z) {
        this.flow.set(z);
    }

    public boolean isReady() {
        return this.connection.isWritable() && this.flow.get() && this.hasRoom.get() && !this.closed.get();
    }

    @Override // io.ballerina.messaging.broker.amqp.codec.AmqpChannelView
    public boolean isClosed() {
        return this.closed.get();
    }

    @Override // io.ballerina.messaging.broker.amqp.codec.AmqpChannelView
    public boolean isFlowEnabled() {
        return this.flow.get();
    }

    public ChannelFlowManager getFlowManager() {
        return this.flowManager;
    }

    public void hold(AmqpDeliverMessage amqpDeliverMessage) {
        this.deliveryPendingMessages.add(amqpDeliverMessage);
    }

    public List<AmqpDeliverMessage> getPendingMessages() {
        ArrayList arrayList = new ArrayList(this.deliveryPendingMessages);
        this.deliveryPendingMessages.clear();
        return arrayList;
    }

    public void setPrefetchCount(int i) {
        this.prefetchCount = i;
    }

    public AmqpDeliverMessage createDeliverMessage(Message message, ShortString shortString, String str) {
        return new AmqpDeliverMessage(message, shortString, this, str, this.broker);
    }

    public void setLocalTransactional() {
        this.transaction = this.broker.newLocalTransaction();
        this.messageAggregator.setTransaction(this.transaction);
    }

    public void setDistributedTransactional() {
        this.transaction = this.broker.newDistributedTransaction();
        this.messageAggregator.setTransaction(this.transaction);
    }

    public void commit() throws ValidationException, BrokerException {
        this.transaction.commit();
        releasePendingAcknowledgements();
    }

    private void releasePendingAcknowledgements() {
        Iterator<AckData> it = this.unackedMessageMap.removeMarkedAcknowledgments().iterator();
        while (it.hasNext()) {
            it.next().getMessage().release();
        }
    }

    public void rollback() throws ValidationException {
        this.transaction.rollback();
        this.unackedMessageMap.resetMarkedAcknowledgments();
    }

    public boolean isNonTransactional() {
        return this.transaction instanceof AutoCommitTransaction;
    }

    public void startDtx(Xid xid, boolean z, boolean z2) throws ValidationException {
        this.transaction.start(xid, this.channelId, z, z2);
    }

    public void endDtx(Xid xid, boolean z, boolean z2) throws ValidationException {
        this.transaction.end(xid, this.channelId, z, z2);
    }

    public void prepare(Xid xid) throws BrokerException, ValidationException {
        this.transaction.prepare(xid);
    }

    public void commit(Xid xid, boolean z) throws ValidationException, BrokerException {
        this.transaction.commit(xid, z);
        releasePendingAcknowledgements();
    }

    public void rollback(Xid xid) throws ValidationException, BrokerException {
        this.transaction.rollback(xid);
        this.unackedMessageMap.resetMarkedAcknowledgments();
    }

    public void forget(Xid xid) throws ValidationException {
        this.transaction.forget(xid);
    }

    public void setTimeout(Xid xid, long j) throws ValidationException {
        this.transaction.setTimeout(xid, j, TimeUnit.SECONDS);
    }
}
