package org.apache.activemq.artemis.protocol.amqp.connect.federation;

import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotImplementedException;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.engine.Sender;

/* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueSenderController.class */
public final class AMQPFederationQueueSenderController extends AMQPFederationBaseSenderController {
    public AMQPFederationQueueSenderController(AMQPSessionContext aMQPSessionContext) throws ActiveMQAMQPException {
        super(aMQPSessionContext);
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.SenderController
    public Consumer init(ProtonServerSenderContext protonServerSenderContext) throws Exception {
        SimpleString simpleString;
        SimpleString simpleString2;
        String str;
        Sender sender = protonServerSenderContext.getSender();
        Source remoteSource = sender.getRemoteSource();
        AMQPFederation aMQPFederation = (AMQPFederation) sender.getSession().getConnection().attachments().get(AMQPFederation.FEDERATION_INSTANCE_RECORD, AMQPFederation.class);
        if (aMQPFederation == null) {
            throw new ActiveMQAMQPIllegalStateException("Cannot create a federation link from non-federation connection");
        }
        if (remoteSource == null) {
            throw new ActiveMQAMQPNotImplementedException("Null source lookup not supported on federation links.");
        }
        sender.setSenderSettleMode(sender.getRemoteSenderSettleMode());
        sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
        sender.setOfferedCapabilities(new Symbol[]{AMQPFederationConstants.FEDERATION_QUEUE_RECEIVER});
        sender.setDesiredCapabilities(new Symbol[]{AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT});
        RoutingType routingType = getRoutingType(remoteSource);
        if (CompositeAddress.isFullyQualified(remoteSource.getAddress())) {
            simpleString = SimpleString.toSimpleString(CompositeAddress.extractAddressName(remoteSource.getAddress()));
            simpleString2 = SimpleString.toSimpleString(CompositeAddress.extractQueueName(remoteSource.getAddress()));
        } else {
            simpleString = null;
            simpleString2 = SimpleString.toSimpleString(remoteSource.getAddress());
        }
        QueueQueryResult queueQuery = this.sessionSPI.queueQuery(simpleString2, routingType, false, null);
        if (!queueQuery.isExists()) {
            aMQPFederation.registerMissingQueue(simpleString2.toString());
            throw new ActiveMQAMQPNotFoundException("Queue: '" + simpleString2 + "' does not exist");
        }
        if (simpleString != null && !queueQuery.getAddress().equals(simpleString)) {
            aMQPFederation.registerMissingQueue(simpleString2.toString());
            throw new ActiveMQAMQPNotFoundException("Queue: '" + simpleString2 + "' is not mapped to specified address: " + simpleString);
        }
        Map.Entry<Symbol, DescribedType> findFilter = AmqpSupport.findFilter(remoteSource.getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS);
        if (findFilter != null) {
            String obj = findFilter.getValue().getDescribed().toString();
            try {
                SelectorParser.parse(obj);
                str = (queueQuery.getFilterString() == null || !obj.equals(queueQuery.getFilterString().toString())) ? obj : null;
            } catch (FilterException e) {
                throw new ActiveMQAMQPException(AmqpError.INVALID_FIELD, "Invalid filter", ActiveMQExceptionType.INVALID_FILTER_EXPRESSION);
            }
        } else {
            str = null;
        }
        this.tunnelCoreMessages = AmqpSupport.verifyOfferedCapabilities(sender, AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT);
        SimpleString simpleString3 = simpleString2;
        this.resourceDeletedAction = errorCondition -> {
            aMQPFederation.registerMissingQueue(simpleString3.toString());
        };
        return (Consumer) this.sessionSPI.createSender(protonServerSenderContext, simpleString2, str, false);
    }

    private static RoutingType getRoutingType(Source source) {
        if (source != null && source.getCapabilities() != null) {
            for (Symbol symbol : source.getCapabilities()) {
                if (AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) {
                    return RoutingType.MULTICAST;
                }
                if (AmqpSupport.QUEUE_CAPABILITY.equals(symbol)) {
                    return RoutingType.ANYCAST;
                }
            }
        }
        return ActiveMQDefaultConfiguration.getDefaultRoutingType();
    }
}
