package org.apache.activemq.artemis.core.server.cluster.impl;

import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RefCountMessage;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits;
import org.apache.activemq.artemis.core.client.impl.ClientProducerFlowCallback;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationService;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.class */
public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler, ReadyListener, ClientProducerFlowCallback {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    protected final ServerLocatorInternal serverLocator;
    protected final Executor executor;
    protected final ScheduledExecutorService scheduledExecutor;
    private final UUID nodeUUID;
    private final long sequentialID;
    protected final Queue queue;
    private final Filter filter;
    private final Transformer transformer;
    private boolean blockedOnFlowControl;
    protected ScheduledFuture<?> scheduledReconnection;
    protected volatile ClientSessionInternal session;
    protected volatile ClientSessionInternal sessionConsumer;
    protected String targetNodeID;
    protected TopologyMember targetNode;
    private volatile ClientSessionFactoryInternal csf;
    private volatile ClientProducer producer;
    private boolean deliveringLargeMessage;
    private int reconnectAttemptsInUse;
    private NotificationService notificationService;
    private final ActiveMQServer server;
    private final BridgeConfiguration configuration;
    private final OperationContextImpl bridgeContext;
    private final ReusableLatch pendingAcks = new ReusableLatch(0);
    final Map<Long, MessageReference> refs = new LinkedHashMap();
    private final Object connectionGuard = new Object();
    protected volatile boolean disconnectedAndDown = false;
    private volatile State state = State.STOPPED;
    private int retryCount = 0;
    private boolean keepConnecting = true;
    private final BridgeMetrics metrics = new BridgeMetrics();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$activemq$artemis$core$server$ComponentConfigurationRoutingType = new int[ComponentConfigurationRoutingType.values().length];

        static {
            try {
                $SwitchMap$org$apache$activemq$artemis$core$server$ComponentConfigurationRoutingType[ComponentConfigurationRoutingType.ANYCAST.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$activemq$artemis$core$server$ComponentConfigurationRoutingType[ComponentConfigurationRoutingType.MULTICAST.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$activemq$artemis$core$server$ComponentConfigurationRoutingType[ComponentConfigurationRoutingType.STRIP.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$activemq$artemis$core$server$ComponentConfigurationRoutingType[ComponentConfigurationRoutingType.PASS.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl$ConnectRunnable.class */
    private class ConnectRunnable implements Runnable {
        private ConnectRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            if (BridgeImpl.this.state == State.STOPPING || BridgeImpl.this.state == State.PAUSING) {
                BridgeImpl.logger.debug("Bridge {} state is {}. Ignoring call to connect.", BridgeImpl.this.configuration.getName(), BridgeImpl.this.state);
                return;
            }
            synchronized (BridgeImpl.this.connectionGuard) {
                if (BridgeImpl.this.keepConnecting) {
                    if (BridgeImpl.logger.isDebugEnabled()) {
                        BridgeImpl.logger.debug("Connecting  {} to its destination [{}], csf={}", new Object[]{this, BridgeImpl.this.nodeUUID, BridgeImpl.this.csf});
                    }
                    BridgeImpl.this.retryCount++;
                    try {
                        try {
                            try {
                                if (BridgeImpl.this.csf == null || BridgeImpl.this.csf.isClosed()) {
                                    if (BridgeImpl.this.state == State.STOPPING || BridgeImpl.this.state == State.PAUSING) {
                                        return;
                                    }
                                    BridgeImpl.this.csf = BridgeImpl.this.createSessionFactory();
                                    if (BridgeImpl.this.csf == null) {
                                        BridgeImpl.this.scheduleRetryConnect();
                                        return;
                                    } else {
                                        BridgeImpl.this.session = BridgeImpl.this.csf.createSession(BridgeImpl.this.configuration.getUser(), BridgeImpl.this.configuration.getPassword(), false, true, true, true, 1);
                                        BridgeImpl.this.session.getProducerCreditManager().setCallback(BridgeImpl.this);
                                        BridgeImpl.this.sessionConsumer = BridgeImpl.this.csf.createSession(BridgeImpl.this.configuration.getUser(), BridgeImpl.this.configuration.getPassword(), false, true, true, true, 1);
                                    }
                                }
                                if (BridgeImpl.this.configuration.getForwardingAddress() != null) {
                                    try {
                                        if (!BridgeImpl.this.session.addressQuery(SimpleString.of(BridgeImpl.this.configuration.getForwardingAddress())).isExists()) {
                                            ActiveMQServerLogger.LOGGER.errorQueryingBridge(BridgeImpl.this.configuration.getForwardingAddress(), Integer.valueOf(BridgeImpl.this.retryCount));
                                            BridgeImpl.this.scheduleRetryConnect();
                                            return;
                                        }
                                    } catch (Throwable th) {
                                        ActiveMQServerLogger.LOGGER.errorQueryingBridge(BridgeImpl.this.configuration.getName(), th);
                                        BridgeImpl.this.retryCount--;
                                        BridgeImpl.this.scheduleRetryConnectFixedTimeout(100L);
                                        return;
                                    }
                                }
                                BridgeImpl.this.blockedOnFlowControl = false;
                                BridgeImpl.this.producer = BridgeImpl.this.session.createProducer();
                                BridgeImpl.this.session.addFailureListener(BridgeImpl.this);
                                BridgeImpl.this.session.setSendAcknowledgementHandler(BridgeImpl.this);
                                BridgeImpl.this.afterConnect();
                                BridgeImpl.this.state = State.STARTED;
                                BridgeImpl.this.queue.addConsumer(BridgeImpl.this);
                                BridgeImpl.this.queue.deliverAsync();
                                ActiveMQServerLogger.LOGGER.bridgeConnected(BridgeImpl.this);
                                BridgeImpl.this.serverLocator.addClusterTopologyListener(new ClusterTopologyListener() { // from class: org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl.ConnectRunnable.1
                                    public void nodeUP(TopologyMember topologyMember, boolean z) {
                                        BridgeImpl.this.nodeUP(topologyMember, z);
                                    }

                                    public void nodeDown(long j, String str) {
                                    }
                                });
                                BridgeImpl.this.keepConnecting = false;
                            } catch (Exception e) {
                                ActiveMQServerLogger.LOGGER.errorConnectingBridge(BridgeImpl.this, e);
                                if (BridgeImpl.this.csf != null) {
                                    try {
                                        BridgeImpl.this.csf.close();
                                        BridgeImpl.this.csf = null;
                                    } catch (Throwable th2) {
                                    }
                                }
                                BridgeImpl.this.fail(false, false);
                                BridgeImpl.this.scheduleRetryConnect();
                            }
                        } catch (ActiveMQInterruptedException | InterruptedException e2) {
                            ActiveMQServerLogger.LOGGER.errorConnectingBridge(BridgeImpl.this, e2);
                        }
                    } catch (ActiveMQException e3) {
                        if (e3.getType() == ActiveMQExceptionType.SESSION_CREATION_REJECTED) {
                            ActiveMQServerLogger.LOGGER.errorStartingBridge(BridgeImpl.this.configuration.getName());
                            BridgeImpl.this.retryCount--;
                            BridgeImpl.this.scheduleRetryConnectFixedTimeout(BridgeImpl.this.configuration.getRetryInterval());
                        } else {
                            ActiveMQServerLogger.LOGGER.errorConnectingBridgeRetry(BridgeImpl.this);
                            BridgeImpl.logger.debug("Underlying bridge connection failure", e3);
                            BridgeImpl.this.scheduleRetryConnect();
                        }
                    }
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl$PauseRunnable.class */
    private class PauseRunnable implements Runnable {
        private PauseRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                BridgeImpl.logger.debug("pausing bridge {}", BridgeImpl.this);
                BridgeImpl.logger.trace("Removing consumer on pauseRunnable {} from queue {}", this, BridgeImpl.this.queue);
                BridgeImpl.this.queue.removeConsumer(BridgeImpl.this);
                if (!BridgeImpl.this.pendingAcks.await(BridgeImpl.this.configuration.getPendingAckTimeout(), TimeUnit.MILLISECONDS)) {
                    ActiveMQServerLogger.LOGGER.timedOutWaitingForSendAcks("Pausing", BridgeImpl.this.configuration.getName(), BridgeImpl.this.pendingAcks.getCount());
                }
                synchronized (BridgeImpl.this) {
                    BridgeImpl.this.state = State.PAUSED;
                }
                BridgeImpl.this.internalCancelReferences();
                BridgeImpl.this.sendNotification(CoreNotificationType.BRIDGE_STOPPED);
                ActiveMQServerLogger.LOGGER.bridgePaused(BridgeImpl.this.configuration.getName());
            } catch (Exception e) {
                ActiveMQServerLogger.LOGGER.errorPausingBridge(BridgeImpl.this.configuration.getName(), e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl$ScheduledConnectRunnable.class */
    public class ScheduledConnectRunnable implements Runnable {
        private ScheduledConnectRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            if (BridgeImpl.this.isStarted()) {
                BridgeImpl.this.executor.execute(new ConnectRunnable());
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl$State.class */
    public enum State {
        STARTING,
        STARTED,
        PAUSING,
        PAUSED,
        STOPPING,
        STOPPED
    }

    /* loaded from: input_file:org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl$StopRunnable.class */
    private class StopRunnable implements Runnable {
        private StopRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                BridgeImpl.logger.debug("stopping bridge {}", BridgeImpl.this);
                BridgeImpl.logger.trace("Removing consumer on stopRunnable {} from queue {}", this, BridgeImpl.this.queue);
                BridgeImpl.this.queue.removeConsumer(BridgeImpl.this);
                if (!BridgeImpl.this.pendingAcks.await(BridgeImpl.this.configuration.getPendingAckTimeout(), TimeUnit.MILLISECONDS)) {
                    ActiveMQServerLogger.LOGGER.timedOutWaitingForSendAcks("Stopping", BridgeImpl.this.configuration.getName(), BridgeImpl.this.pendingAcks.getCount());
                }
                synchronized (BridgeImpl.this) {
                    BridgeImpl.this.state = State.STOPPED;
                }
                if (BridgeImpl.this.session != null) {
                    BridgeImpl.logger.debug("Cleaning up session {} for bridge {}", BridgeImpl.this.session, BridgeImpl.this.configuration.getName());
                    BridgeImpl.this.session.removeFailureListener(BridgeImpl.this);
                    try {
                        BridgeImpl.this.session.close();
                        BridgeImpl.this.session = null;
                    } catch (ActiveMQException e) {
                    }
                }
                if (BridgeImpl.this.sessionConsumer != null) {
                    BridgeImpl.logger.debug("Cleaning up session {}", BridgeImpl.this.session);
                    try {
                        BridgeImpl.this.sessionConsumer.close();
                        BridgeImpl.this.sessionConsumer = null;
                    } catch (ActiveMQException e2) {
                    }
                }
                BridgeImpl.this.internalCancelReferences();
                if (BridgeImpl.this.csf != null) {
                    BridgeImpl.this.csf.cleanup();
                }
                synchronized (BridgeImpl.this.connectionGuard) {
                    BridgeImpl.this.keepConnecting = true;
                }
                BridgeImpl.this.sendNotification(CoreNotificationType.BRIDGE_STOPPED);
                ActiveMQServerLogger.LOGGER.bridgeStopped(BridgeImpl.this.configuration.getName());
            } catch (Exception e3) {
                ActiveMQServerLogger.LOGGER.errorStoppingBridge(BridgeImpl.this.configuration.getName(), e3);
            }
        }
    }

    public BridgeImpl(ServerLocatorInternal serverLocatorInternal, BridgeConfiguration bridgeConfiguration, UUID uuid, Queue queue, Executor executor, ScheduledExecutorService scheduledExecutorService, ActiveMQServer activeMQServer) throws ActiveMQException {
        this.sequentialID = activeMQServer.getStorageManager().generateID();
        this.configuration = bridgeConfiguration;
        this.reconnectAttemptsInUse = bridgeConfiguration.getInitialConnectAttempts();
        this.serverLocator = serverLocatorInternal;
        this.nodeUUID = uuid;
        this.queue = queue;
        this.executor = executor;
        this.scheduledExecutor = scheduledExecutorService;
        this.transformer = activeMQServer.getServiceRegistry().getBridgeTransformer(bridgeConfiguration.getName(), bridgeConfiguration.getTransformerConfiguration());
        this.filter = FilterImpl.createFilter(bridgeConfiguration.getFilterString());
        this.server = activeMQServer;
        this.bridgeContext = new OperationContextImpl(executor);
    }

    public static final byte[] getDuplicateBytes(UUID uuid, long j) {
        byte[] bArr = new byte[24];
        ByteBuffer wrap = ByteBuffer.wrap(bArr);
        wrap.put(uuid.asBytes());
        wrap.putLong(j);
        return bArr;
    }

    public boolean isBlockedOnFlowControl() {
        return this.blockedOnFlowControl;
    }

    public ClientSessionFactory getSessionFactory() {
        return this.csf;
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public List<MessageReference> getDeliveringMessages() {
        ArrayList arrayList;
        synchronized (this.refs) {
            arrayList = new ArrayList(this.refs.values());
        }
        return arrayList;
    }

    private static void cleanUpSessionFactory(ClientSessionFactoryInternal clientSessionFactoryInternal) {
        if (clientSessionFactoryInternal != null) {
            clientSessionFactoryInternal.cleanup();
        }
    }

    @Override // org.apache.activemq.artemis.core.server.cluster.Bridge
    public void setNotificationService(NotificationService notificationService) {
        this.notificationService = notificationService;
    }

    public void onCreditsFlow(boolean z, ClientProducerCredits clientProducerCredits) {
        if (logger.isTraceEnabled()) {
            logger.trace("Bridge {} received credits, with blocked = {}", this.configuration.getName(), Boolean.valueOf(z));
        }
        this.blockedOnFlowControl = z;
        if (z) {
            return;
        }
        this.queue.deliverAsync();
    }

    public void onCreditsFail(ClientProducerCredits clientProducerCredits) {
        ActiveMQServerLogger.LOGGER.bridgeAddressFull(String.valueOf(clientProducerCredits.getAddress()), this.configuration.getName());
        disconnect();
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public long sequentialID() {
        return this.sequentialID;
    }

    public synchronized void start() throws Exception {
        State state = this.state;
        if (state == State.STARTING || state == State.STARTED || state == State.STOPPING || state == State.PAUSING) {
            logger.debug("Bridge {} state is {}. Ignoring call to start.", this.configuration.getName(), state);
            if (state == State.STOPPING || state == State.PAUSING) {
                throw ActiveMQMessageBundle.BUNDLE.bridgeOperationCannotBeExecuted(this.configuration.getName(), "started", state);
            }
            return;
        }
        this.state = State.STARTING;
        logger.debug("Bridge {} is starting", this.configuration.getName());
        this.executor.execute(new ConnectRunnable());
        sendNotification(CoreNotificationType.BRIDGE_STARTED);
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public String debug() {
        return toString();
    }

    private void cancelRefs() {
        LinkedList linkedList;
        synchronized (this.refs) {
            linkedList = new LinkedList(this.refs.values());
            this.refs.clear();
        }
        if (logger.isTraceEnabled()) {
            logger.trace("BridgeImpl::cancelRefs cancelling {} references", Integer.valueOf(linkedList.size()));
        }
        if (logger.isTraceEnabled() && linkedList.isEmpty()) {
            logger.trace("didn't have any references to cancel on bridge {}", this);
            return;
        }
        ListIterator listIterator = linkedList.listIterator(linkedList.size());
        long currentTimeMillis = System.currentTimeMillis();
        while (listIterator.hasPrevious()) {
            MessageReference messageReference = (MessageReference) listIterator.previous();
            logger.trace("BridgeImpl::cancelRefs Cancelling reference {} on bridge {}", messageReference, this);
            try {
                messageReference.getQueue().cancel(messageReference, currentTimeMillis);
            } catch (Exception e) {
                ActiveMQServerLogger.LOGGER.errorCancellingRefOnBridge(messageReference, e);
            }
        }
    }

    @Override // org.apache.activemq.artemis.core.server.cluster.Bridge
    public void flushExecutor() {
        Runnable futureLatch = new FutureLatch();
        this.executor.execute(futureLatch);
        if (futureLatch.await(10000L)) {
            return;
        }
        ActiveMQServerLogger.LOGGER.timedOutWaitingToStopBridge();
    }

    @Override // org.apache.activemq.artemis.core.server.cluster.Bridge, org.apache.activemq.artemis.core.server.Consumer
    public void disconnect() {
        this.executor.execute(() -> {
            if (this.session != null) {
                try {
                    this.session.cleanUp(false);
                } catch (Exception e) {
                    logger.debug(e.getMessage(), e);
                }
                this.session = null;
            }
            if (this.sessionConsumer != null) {
                try {
                    this.sessionConsumer.cleanUp(false);
                } catch (Exception e2) {
                    logger.debug(e2.getMessage(), e2);
                }
                this.sessionConsumer = null;
            }
        });
    }

    @Override // org.apache.activemq.artemis.core.server.cluster.Bridge
    public boolean isConnected() {
        return this.session != null;
    }

    public Executor getExecutor() {
        return this.executor;
    }

    public synchronized void stop() throws Exception {
        State state = this.state;
        if (state == State.STOPPING || state == State.STOPPED || state == State.PAUSING) {
            logger.debug("Bridge {} state is {}. Ignoring call to stop.", this.configuration.getName(), state);
            if (state == State.PAUSING) {
                throw ActiveMQMessageBundle.BUNDLE.bridgeOperationCannotBeExecuted(this.configuration.getName(), "stopped", state);
            }
        } else {
            this.state = State.STOPPING;
            logger.debug("Bridge {} is stopping", this.configuration.getName());
            if (this.scheduledReconnection != null) {
                this.scheduledReconnection.cancel(true);
            }
            this.executor.execute(new StopRunnable());
        }
    }

    @Override // org.apache.activemq.artemis.core.server.cluster.Bridge
    public synchronized void pause() throws Exception {
        State state = this.state;
        if (state != State.STOPPING && state != State.STOPPED && state != State.PAUSING && state != State.PAUSED) {
            this.state = State.PAUSING;
            logger.info("Bridge {} is pausing", this.configuration.getName());
            this.executor.execute(new PauseRunnable());
        } else {
            logger.debug("Bridge {} state is {}. Ignoring call to pause.", this.configuration.getName(), state);
            if (state == State.STOPPING || state == State.STOPPED) {
                throw ActiveMQMessageBundle.BUNDLE.bridgeOperationCannotBeExecuted(this.configuration.getName(), "paused", state);
            }
        }
    }

    @Override // org.apache.activemq.artemis.core.server.cluster.Bridge
    public void resume() throws Exception {
        this.queue.addConsumer(this);
        this.queue.deliverAsync();
    }

    public boolean isStarted() {
        return this.state == State.STARTING || this.state == State.STARTED;
    }

    @Override // org.apache.activemq.artemis.core.server.cluster.Bridge
    public SimpleString getName() {
        return SimpleString.of(this.configuration.getName());
    }

    @Override // org.apache.activemq.artemis.core.server.cluster.Bridge
    public Queue getQueue() {
        return this.queue;
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public Filter getFilter() {
        return this.filter;
    }

    @Override // org.apache.activemq.artemis.core.server.cluster.Bridge
    public SimpleString getForwardingAddress() {
        return SimpleString.of(this.configuration.getForwardingAddress());
    }

    @Override // org.apache.activemq.artemis.core.server.cluster.Bridge
    public RemotingConnection getForwardingConnection() {
        if (this.session == null) {
            return null;
        }
        return this.session.getConnection();
    }

    public void sendFailed(Message message, Exception exc) {
        if (exc instanceof ActiveMQAddressFullException) {
            logger.warn(exc.getMessage(), exc);
            failed(exc);
        }
    }

    public void sendAcknowledged(Message message) {
        MessageReference remove;
        OperationContext context = OperationContextImpl.getContext();
        try {
            OperationContextImpl.setContext(this.bridgeContext);
            logger.debug("Bridge {} received confirmation for message {}", this.configuration.getName(), message);
            State state = this.state;
            if (state == State.STARTED || state == State.STOPPING || state == State.PAUSING) {
                try {
                    synchronized (this.refs) {
                        remove = this.refs.remove(Long.valueOf(message.getMessageID()));
                    }
                    if (remove != null) {
                        if (logger.isTraceEnabled()) {
                            logger.trace("BridgeImpl::sendAcknowledged bridge {} Acking {} on queue {}", new Object[]{this, remove, remove.getQueue()});
                        }
                        remove.getQueue().acknowledge(remove);
                        this.pendingAcks.countDown();
                        this.metrics.incrementMessagesAcknowledged();
                        if (this.server.hasBrokerBridgePlugins()) {
                            this.server.callBrokerBridgePlugins(activeMQServerBridgePlugin -> {
                                activeMQServerBridgePlugin.afterAcknowledgeBridge(this, remove);
                            });
                        }
                    } else {
                        logger.trace("BridgeImpl::sendAcknowledged bridge {} could not find reference for message {}", this, message);
                    }
                } catch (Exception e) {
                    ActiveMQServerLogger.LOGGER.bridgeFailedToAck(e);
                }
            } else {
                logger.debug("Bridge {} state is {}. Ignoring call to sendAcknowledged.", this.configuration.getName(), state);
            }
        } finally {
            OperationContextImpl.setContext(context);
        }
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public void failed(Throwable th) {
        if (th instanceof ActiveMQException) {
            connectionFailed((ActiveMQException) th, false);
            return;
        }
        ActiveMQException activeMQException = new ActiveMQException(th.getMessage());
        activeMQException.initCause(th);
        connectionFailed(activeMQException, false);
    }

    protected Message beforeForward(Message message, SimpleString simpleString) {
        RefCountMessage copy = message.copy();
        copy.setParentRef(copy);
        return beforeForwardingNoCopy(copy, simpleString);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Message beforeForwardingNoCopy(Message message, SimpleString simpleString) {
        if (this.configuration.isUseDuplicateDetection()) {
            message.putExtraBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID, getDuplicateBytes(this.nodeUUID, message.getMessageID()));
        }
        if (simpleString != null) {
            message.setAddress(simpleString);
        }
        switch (AnonymousClass1.$SwitchMap$org$apache$activemq$artemis$core$server$ComponentConfigurationRoutingType[this.configuration.getRoutingType().ordinal()]) {
            case 1:
                message.setRoutingType(RoutingType.ANYCAST);
                break;
            case 2:
                message.setRoutingType(RoutingType.MULTICAST);
                break;
            case TransactionPropertyIndexes.PAGE_COUNT_INC /* 3 */:
                message.setRoutingType((RoutingType) null);
                break;
        }
        message.messageChanged();
        if (this.transformer == null) {
            return EmbedMessageUtil.embedAsCoreMessage(message);
        }
        Message transform = this.transformer.transform(message);
        if (transform != message) {
            logger.debug("The transformer {} made a copy of the message {} as transformedMessage", this.transformer, message);
        }
        return EmbedMessageUtil.embedAsCoreMessage(transform);
    }

    public void readyForWriting() {
        this.queue.deliverAsync();
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public HandleStatus handle(MessageReference messageReference) throws Exception {
        HandleStatus deliverStandardMessage;
        if (RefCountMessage.isRefTraceEnabled() && (messageReference.getMessage() instanceof RefCountMessage)) {
            RefCountMessage.deferredDebug(messageReference.getMessage(), "Going through the bridge", new Object[0]);
        }
        if (this.filter != null && !this.filter.match(messageReference.getMessage())) {
            logger.trace("message reference {} is no match for bridge {}", messageReference, this.configuration.getName());
            return HandleStatus.NO_MATCH;
        }
        synchronized (this) {
            if (this.state != State.STARTED || !this.session.isWritable(this)) {
                if (logger.isDebugEnabled()) {
                    logger.debug("{}::Ignoring reference on bridge as it is set to inactive ref {}, active = false", this, messageReference);
                }
                return HandleStatus.BUSY;
            }
            if (this.blockedOnFlowControl) {
                logger.debug("Bridge {} is blocked on flow control, cannot receive {}", this.configuration.getName(), messageReference);
                return HandleStatus.BUSY;
            }
            if (this.deliveringLargeMessage) {
                logger.trace("Bridge {} is busy delivering a large message", this.configuration.getName());
                return HandleStatus.BUSY;
            }
            logger.trace("Bridge {} is handling reference {} ", this.configuration.getName(), messageReference);
            messageReference.handled();
            synchronized (this.refs) {
                this.refs.put(Long.valueOf(messageReference.getMessage().getMessageID()), messageReference);
            }
            SimpleString of = this.configuration.getForwardingAddress() != null ? SimpleString.of(this.configuration.getForwardingAddress()) : messageReference.getMessage().getAddressSimpleString();
            Message beforeForward = beforeForward(messageReference.getMessage(), of);
            this.pendingAcks.countUp();
            try {
                if (this.server.hasBrokerBridgePlugins()) {
                    this.server.callBrokerBridgePlugins(activeMQServerBridgePlugin -> {
                        activeMQServerBridgePlugin.beforeDeliverBridge(this, messageReference);
                    });
                }
                if (beforeForward.isLargeMessage()) {
                    this.deliveringLargeMessage = true;
                    deliverLargeMessage(of, messageReference, (LargeServerMessage) beforeForward);
                    deliverStandardMessage = HandleStatus.HANDLED;
                } else {
                    deliverStandardMessage = deliverStandardMessage(of, messageReference, beforeForward, messageReference.getMessage());
                }
                if (deliverStandardMessage == HandleStatus.HANDLED) {
                    this.metrics.incrementMessagesPendingAcknowledgement();
                }
                if (this.server.hasBrokerBridgePlugins()) {
                    HandleStatus handleStatus = deliverStandardMessage;
                    this.server.callBrokerBridgePlugins(activeMQServerBridgePlugin2 -> {
                        activeMQServerBridgePlugin2.afterDeliverBridge(this, messageReference, handleStatus);
                    });
                }
                return deliverStandardMessage;
            } catch (Exception e) {
                this.pendingAcks.countDown();
                throw e;
            }
        }
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public void proceedDeliver(MessageReference messageReference) {
    }

    public void connectionFailed(ActiveMQException activeMQException, boolean z) {
        connectionFailed(activeMQException, z, null);
    }

    public void connectionFailed(ActiveMQException activeMQException, boolean z, String str) {
        if (this.server.isStarted()) {
            if (activeMQException instanceof ActiveMQDisconnectedException) {
                ActiveMQServerLogger.LOGGER.bridgeConnectionClosed(Boolean.valueOf(z));
            } else {
                ActiveMQServerLogger.LOGGER.bridgeConnectionFailed(Boolean.valueOf(z));
            }
        }
        synchronized (this.connectionGuard) {
            this.keepConnecting = true;
        }
        try {
            if (this.producer != null) {
                this.producer.close();
            }
            cleanUpSessionFactory(this.csf);
        } catch (Throwable th) {
        }
        try {
            this.session.cleanUp(false);
        } catch (Throwable th2) {
        }
        if (str != null && !str.equals(this.nodeUUID.toString())) {
            scaleDown(str);
        } else if (str != null) {
            logger.debug("Received scaleDownTargetNodeID: {}; cancelling reconnect.", str);
            fail(true, true);
        } else {
            logger.debug("Received null scaleDownTargetNodeID");
            fail(activeMQException.getType() == ActiveMQExceptionType.DISCONNECTED, false);
        }
        tryScheduleRetryReconnect(activeMQException.getType());
    }

    protected void scaleDown(String str) {
        synchronized (this) {
            try {
                if (logger.isDebugEnabled()) {
                    logger.debug("Moving {} messages from {} to {}", new Object[]{Long.valueOf(this.queue.getMessageCount()), this.queue.getName(), str});
                }
                ((QueueImpl) this.queue).moveReferencesBetweenSnFQueues(SimpleString.of(str));
                fail(true, true);
            } catch (Exception e) {
                logger.warn(e.getMessage(), e);
            }
        }
    }

    protected void tryScheduleRetryReconnect(ActiveMQExceptionType activeMQExceptionType) {
        scheduleRetryConnect();
    }

    public void beforeReconnect(ActiveMQException activeMQException) {
    }

    private void deliverLargeMessage(SimpleString simpleString, MessageReference messageReference, LargeServerMessage largeServerMessage) {
        this.executor.execute(() -> {
            logger.trace("going to send large message: {} from {}", largeServerMessage, this.queue);
            try {
                this.producer.send(simpleString, largeServerMessage.toMessage());
                unsetLargeMessageDelivery();
                if (this.queue != null) {
                    this.queue.deliverAsync();
                }
            } catch (ActiveMQException e) {
                unsetLargeMessageDelivery();
                ActiveMQServerLogger.LOGGER.bridgeUnableToSendMessage(messageReference, e);
                connectionFailed(e, false);
            }
        });
    }

    private HandleStatus deliverStandardMessage(SimpleString simpleString, MessageReference messageReference, Message message, Message message2) {
        logger.trace("going to send message: {} from {}", message, this.queue);
        try {
            try {
                this.producer.send(simpleString, message);
                message2.usageDown();
                return HandleStatus.HANDLED;
            } catch (ActiveMQException e) {
                ActiveMQServerLogger.LOGGER.bridgeUnableToSendMessage(messageReference, e);
                synchronized (this.refs) {
                    this.refs.remove(Long.valueOf(message.getMessageID()));
                    ((QueueImpl) messageReference.getQueue()).decDelivering(messageReference);
                    connectionFailed(e, false);
                    HandleStatus handleStatus = HandleStatus.BUSY;
                    message2.usageDown();
                    return handleStatus;
                }
            }
        } catch (Throwable th) {
            message2.usageDown();
            throw th;
        }
    }

    public TopologyMember getTargetNodeFromTopology() {
        return this.targetNode;
    }

    @Override // org.apache.activemq.artemis.core.server.cluster.Bridge
    public BridgeMetrics getMetrics() {
        return this.metrics;
    }

    public String toString() {
        return getClass().getSimpleName() + "@" + Integer.toHexString(System.identityHashCode(this)) + " [name=" + this.configuration.getName() + ", queue=" + this.queue + " targetConnector=" + this.serverLocator + "]";
    }

    @Override // org.apache.activemq.artemis.core.server.Consumer
    public String toManagementString() {
        return getClass().getSimpleName() + " [name=" + this.configuration.getName() + ", queue=" + this.queue.getName() + "/" + this.queue.getID() + "]";
    }

    public Transformer getTransformer() {
        return this.transformer;
    }

    @Override // org.apache.activemq.artemis.core.server.cluster.Bridge
    public BridgeConfiguration getConfiguration() {
        return this.configuration;
    }

    public State getState() {
        return this.state;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void fail(boolean z, boolean z2) {
        logger.debug("{}\n\t::fail being called, permanently={}", this, Boolean.valueOf(z));
        if (this.targetNodeID != null) {
            this.disconnectedAndDown = true;
            this.serverLocator.notifyNodeDown(System.currentTimeMillis(), this.targetNodeID);
        }
        if (this.queue != null) {
            try {
                logger.trace("Removing consumer on fail {} from queue {}", this, this.queue);
                this.queue.removeConsumer(this);
            } catch (Exception e) {
                logger.debug(e.getMessage(), e);
            }
        }
        cancelRefs();
        if (this.queue != null) {
            this.queue.deliverAsync();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void afterConnect() throws Exception {
        if (this.disconnectedAndDown && this.targetNodeID != null && this.targetNode != null) {
            this.serverLocator.notifyNodeUp(System.currentTimeMillis(), this.targetNodeID, this.targetNode.getBackupGroupName(), this.targetNode.getScaleDownGroupName(), new Pair(this.targetNode.getPrimary(), this.targetNode.getBackup()), false);
            this.disconnectedAndDown = false;
        }
        this.retryCount = 0;
        this.reconnectAttemptsInUse = this.configuration.getReconnectAttempts();
        if (this.scheduledReconnection != null) {
            this.scheduledReconnection.cancel(true);
            this.scheduledReconnection = null;
        }
    }

    protected ClientSessionFactoryInternal createSessionFactory() throws Exception {
        if (this.targetNodeID == null || (this.configuration.getReconnectAttemptsOnSameNode() >= 0 && this.retryCount > this.configuration.getReconnectAttemptsOnSameNode())) {
            this.serverLocator.resetToInitialConnectors();
            this.csf = this.serverLocator.createSessionFactory();
        } else {
            this.csf = reconnectOnOriginalNode();
        }
        if (this.csf != null) {
            this.csf.setReconnectAttempts(0);
        }
        return this.csf;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ClientSessionFactoryInternal reconnectOnOriginalNode() throws Exception {
        String str = this.targetNodeID;
        TopologyMember topologyMember = this.targetNode;
        if (str == null || topologyMember == null) {
            return null;
        }
        TransportConfiguration[] transportConfigurationArr = new TransportConfiguration[2];
        int i = 0;
        if (topologyMember.getPrimary() != null) {
            i = 0 + 1;
            transportConfigurationArr[0] = topologyMember.getPrimary();
        }
        if (topologyMember.getBackup() != null) {
            int i2 = i;
            i++;
            transportConfigurationArr[i2] = topologyMember.getBackup();
        }
        if (i <= 0) {
            return null;
        }
        return this.serverLocator.createSessionFactory(transportConfigurationArr[(this.retryCount - 1) % i]);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setSessionFactory(ClientSessionFactoryInternal clientSessionFactoryInternal) {
        this.csf = clientSessionFactoryInternal;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void scheduleRetryConnect() {
        if (this.serverLocator.isClosed()) {
            ActiveMQServerLogger.LOGGER.bridgeLocatorShutdown();
            return;
        }
        if (this.state == State.STOPPING || this.state == State.PAUSING) {
            ActiveMQServerLogger.LOGGER.bridgeWillNotRetry(this.state == State.STOPPING ? "stopping" : "pausing");
            return;
        }
        if (this.reconnectAttemptsInUse >= 0 && this.retryCount > this.reconnectAttemptsInUse) {
            ActiveMQServerLogger.LOGGER.bridgeAbortStart(this.configuration.getName(), Integer.valueOf(this.retryCount), Integer.valueOf(this.configuration.getReconnectAttempts()));
            fail(true, false);
            return;
        }
        long retryInterval = (long) (this.configuration.getRetryInterval() * Math.pow(this.configuration.getRetryIntervalMultiplier(), this.retryCount));
        if (retryInterval == 0) {
            retryInterval = this.configuration.getRetryInterval();
        }
        if (retryInterval > this.configuration.getMaxRetryInterval()) {
            retryInterval = this.configuration.getMaxRetryInterval();
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Bridge {} retrying connection #{}, maxRetry={}, timeout={}", new Object[]{this, Integer.valueOf(this.retryCount), Integer.valueOf(this.reconnectAttemptsInUse), Long.valueOf(retryInterval)});
        }
        scheduleRetryConnectFixedTimeout(retryInterval);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void nodeUP(TopologyMember topologyMember, boolean z) {
        if (topologyMember != null) {
            ClientSessionInternal clientSessionInternal = this.session;
            RemotingConnection connection = clientSessionInternal != null ? clientSessionInternal.getConnection() : null;
            if (this.targetNodeID != null && this.targetNodeID.equals(topologyMember.getNodeId())) {
                this.targetNode = topologyMember;
            } else {
                if (connection == null || !topologyMember.isMember(connection)) {
                    return;
                }
                this.targetNode = topologyMember;
                this.targetNodeID = topologyMember.getNodeId();
            }
        }
    }

    protected void scheduleRetryConnectFixedTimeout(long j) {
        try {
            cleanUpSessionFactory(this.csf);
        } catch (Throwable th) {
        }
        if (this.state == State.STOPPING || this.state == State.STOPPED || this.state == State.PAUSING || this.state == State.PAUSED) {
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Scheduling retry for bridge {} in {} milliseconds", this.configuration.getName(), Long.valueOf(j));
        }
        this.scheduledReconnection = this.scheduledExecutor.schedule(new ScheduledConnectRunnable(), j, TimeUnit.MILLISECONDS);
    }

    private void internalCancelReferences() {
        cancelRefs();
        if (this.queue != null) {
            this.queue.deliverAsync();
        }
    }

    private synchronized void unsetLargeMessageDelivery() {
        this.deliveringLargeMessage = false;
    }

    private void sendNotification(CoreNotificationType coreNotificationType) {
        if (this.notificationService != null) {
            TypedProperties typedProperties = new TypedProperties();
            typedProperties.putSimpleStringProperty(SimpleString.of("name"), getName());
            try {
                this.notificationService.sendNotification(new Notification(this.nodeUUID.toString(), coreNotificationType, typedProperties));
            } catch (Exception e) {
                ActiveMQServerLogger.LOGGER.notificationBridgeError(this.configuration.getName(), coreNotificationType, e);
            }
        }
    }
}
