package org.apache.activemq.artemis.protocol.amqp.connect.federation;

import java.lang.invoke.MethodHandles;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.protocol.amqp.proton.SenderController;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Terminus;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Sender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationEventDispatcher.class */
public class AMQPFederationEventDispatcher implements SenderController, ActiveMQServerBindingPlugin, ActiveMQServerAddressPlugin {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final Sender sender;
    private final AMQPFederation federation;
    private final AMQPSessionCallback session;
    private final ActiveMQServer server;
    private final Set<String> addressWatches = new HashSet();
    private final Set<String> queueWatches = new HashSet();
    private String eventsAddress;

    public AMQPFederationEventDispatcher(AMQPFederation aMQPFederation, AMQPSessionCallback aMQPSessionCallback, Sender sender) {
        this.session = aMQPSessionCallback;
        this.sender = sender;
        this.federation = aMQPFederation;
        this.server = aMQPFederation.getServer();
    }

    private String getEventsLinkAddress() {
        return this.eventsAddress;
    }

    public void sendEvent(AMQPMessage aMQPMessage) throws Exception {
        Objects.requireNonNull(aMQPMessage, "Null event message is not expected and constitutes an error condition");
        aMQPMessage.m4setAddress(getEventsLinkAddress());
        this.server.getPostOffice().route(aMQPMessage, true);
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.SenderController
    public Consumer init(ProtonServerSenderContext protonServerSenderContext) throws Exception {
        AMQPFederation aMQPFederation = (AMQPFederation) protonServerSenderContext.getSender().getSession().getConnection().attachments().get(AMQPFederation.FEDERATION_INSTANCE_RECORD, AMQPFederation.class);
        if (aMQPFederation == null) {
            throw new ActiveMQAMQPIllegalStateException("Cannot create a federation link from non-federation connection");
        }
        this.sender.setSenderSettleMode(this.sender.getRemoteSenderSettleMode());
        this.sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
        this.eventsAddress = aMQPFederation.prefixEventsLinkQueueName(this.sender.getName());
        if (this.sender.getLocalState() != EndpointState.ACTIVE) {
            this.sender.setOfferedCapabilities(new Symbol[]{AMQPFederationConstants.FEDERATION_EVENT_LINK});
            Terminus remoteSource = this.sender.getRemoteSource();
            if (remoteSource == null || !remoteSource.getDynamic()) {
                throw new ActiveMQAMQPInternalErrorException("Remote Terminus did not arrive as dynamic node: " + remoteSource);
            }
            remoteSource.setAddress(getEventsLinkAddress());
        }
        try {
            this.session.createTemporaryQueue(SimpleString.of(getEventsLinkAddress()), RoutingType.ANYCAST, 1);
            aMQPFederation.registerEventSender(this);
            this.server.registerBrokerPlugin(this);
            return this.session.createSender(protonServerSenderContext, SimpleString.of(getEventsLinkAddress()), null, false);
        } catch (Exception e) {
            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
        }
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.SenderController
    public void close() {
        this.server.unRegisterBrokerPlugin(this);
        try {
            this.session.removeTemporaryQueue(SimpleString.of(getEventsLinkAddress()));
        } catch (Exception e) {
        }
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.SenderController
    public void close(ErrorCondition errorCondition) {
        close();
    }

    public void addAddressWatch(String str) {
        this.addressWatches.add(str);
    }

    public void addQueueWatch(String str) {
        this.queueWatches.add(str);
    }

    public void afterAddAddress(AddressInfo addressInfo, boolean z) throws ActiveMQException {
        String simpleString = addressInfo.getName().toString();
        this.federation.getConnectionContext().runLater(() -> {
            if (this.addressWatches.remove(simpleString)) {
                try {
                    sendEvent(AMQPFederationEventSupport.encodeAddressAddedEvent(simpleString));
                } catch (Exception e) {
                    logger.warn("error on send of address added event: {}", e.getMessage());
                    this.federation.signalError(new ActiveMQAMQPInternalErrorException("Error while processing address added: " + e.getMessage()));
                }
            }
        });
    }

    public void afterAddBinding(Binding binding) throws ActiveMQException {
        if (binding instanceof QueueBinding) {
            String simpleString = ((QueueBinding) binding).getAddress().toString();
            String simpleString2 = ((QueueBinding) binding).getQueue().getName().toString();
            this.federation.getConnectionContext().runLater(() -> {
                if (this.queueWatches.remove(simpleString2)) {
                    try {
                        sendEvent(AMQPFederationEventSupport.encodeQueueAddedEvent(simpleString, simpleString2));
                    } catch (Exception e) {
                        logger.warn("Error on send of queue added event: {}", e.getMessage());
                        this.federation.signalError(new ActiveMQAMQPInternalErrorException("Error while processing queue added: " + e.getMessage()));
                    }
                }
            });
        }
    }
}
