package org.springframework.messaging.simp.stomp;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.ResolvableType;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.SimpleMessageConverter;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.AlternativeJdkIdGenerator;
import org.springframework.util.Assert;
import org.springframework.util.IdGenerator;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.SettableListenableFuture;

/* loaded from: input_file:BOOT-INF/lib/spring-messaging-4.3.16.RELEASE.jar:org/springframework/messaging/simp/stomp/DefaultStompSession.class */
public class DefaultStompSession implements ConnectionHandlingStompSession {
    private static final long HEARTBEAT_MULTIPLIER = 3;
    private final String sessionId;
    private final StompSessionHandler sessionHandler;
    private final StompHeaders connectHeaders;
    private TaskScheduler taskScheduler;
    private volatile boolean autoReceiptEnabled;
    private volatile TcpConnection<byte[]> connection;
    private volatile String version;
    private static final Log logger = LogFactory.getLog(DefaultStompSession.class);
    private static final IdGenerator idGenerator = new AlternativeJdkIdGenerator();
    public static final byte[] EMPTY_PAYLOAD = new byte[0];
    private static final Message<byte[]> HEARTBEAT = MessageBuilder.createMessage(StompDecoder.HEARTBEAT_PAYLOAD, StompHeaderAccessor.createForHeartbeat().getMessageHeaders());
    private final SettableListenableFuture<StompSession> sessionFuture = new SettableListenableFuture<>();
    private MessageConverter converter = new SimpleMessageConverter();
    private long receiptTimeLimit = 15000;
    private final AtomicInteger subscriptionIndex = new AtomicInteger();
    private final Map<String, DefaultSubscription> subscriptions = new ConcurrentHashMap(4);
    private final AtomicInteger receiptIndex = new AtomicInteger();
    private final Map<String, ReceiptHandler> receiptHandlers = new ConcurrentHashMap(4);
    private volatile boolean closing = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/spring-messaging-4.3.16.RELEASE.jar:org/springframework/messaging/simp/stomp/DefaultStompSession$DefaultSubscription.class */
    public class DefaultSubscription extends ReceiptHandler implements StompSession.Subscription {
        private final String id;
        private final String destination;
        private final StompFrameHandler handler;

        public DefaultSubscription(String str, String str2, String str3, StompFrameHandler stompFrameHandler) {
            super(str3);
            Assert.notNull(str2, "Destination must not be null");
            Assert.notNull(stompFrameHandler, "StompFrameHandler must not be null");
            this.id = str;
            this.destination = str2;
            this.handler = stompFrameHandler;
            DefaultStompSession.this.subscriptions.put(str, this);
        }

        @Override // org.springframework.messaging.simp.stomp.StompSession.Subscription
        public String getSubscriptionId() {
            return this.id;
        }

        public String getDestination() {
            return this.destination;
        }

        public StompFrameHandler getHandler() {
            return this.handler;
        }

        @Override // org.springframework.messaging.simp.stomp.StompSession.Subscription
        public void unsubscribe() {
            DefaultStompSession.this.subscriptions.remove(getSubscriptionId());
            DefaultStompSession.this.unsubscribe(getSubscriptionId());
        }

        public String toString() {
            return "Subscription [id=" + getSubscriptionId() + ", destination='" + getDestination() + "', receiptId='" + getReceiptId() + "', handler=" + getHandler() + "]";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/spring-messaging-4.3.16.RELEASE.jar:org/springframework/messaging/simp/stomp/DefaultStompSession$ReadInactivityTask.class */
    public class ReadInactivityTask implements Runnable {
        private ReadInactivityTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            DefaultStompSession.this.closing = true;
            String str = "Server has gone quiet. Closing connection in session id=" + DefaultStompSession.this.sessionId + ".";
            if (DefaultStompSession.logger.isDebugEnabled()) {
                DefaultStompSession.logger.debug(str);
            }
            DefaultStompSession.this.resetConnection();
            DefaultStompSession.this.handleFailure(new IllegalStateException(str));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/spring-messaging-4.3.16.RELEASE.jar:org/springframework/messaging/simp/stomp/DefaultStompSession$ReceiptHandler.class */
    public class ReceiptHandler implements StompSession.Receiptable {
        private final String receiptId;
        private final List<Runnable> receiptCallbacks = new ArrayList(2);
        private final List<Runnable> receiptLostCallbacks = new ArrayList(2);
        private ScheduledFuture<?> future;
        private Boolean result;

        public ReceiptHandler(String str) {
            this.receiptId = str;
            if (this.receiptId != null) {
                initReceiptHandling();
            }
        }

        private void initReceiptHandling() {
            Assert.notNull(DefaultStompSession.this.getTaskScheduler(), "To track receipts, a TaskScheduler must be configured");
            DefaultStompSession.this.receiptHandlers.put(this.receiptId, this);
            this.future = DefaultStompSession.this.getTaskScheduler().schedule(new Runnable() { // from class: org.springframework.messaging.simp.stomp.DefaultStompSession.ReceiptHandler.1
                @Override // java.lang.Runnable
                public void run() {
                    ReceiptHandler.this.handleReceiptNotReceived();
                }
            }, new Date(System.currentTimeMillis() + DefaultStompSession.this.getReceiptTimeLimit()));
        }

        @Override // org.springframework.messaging.simp.stomp.StompSession.Receiptable
        public String getReceiptId() {
            return this.receiptId;
        }

        @Override // org.springframework.messaging.simp.stomp.StompSession.Receiptable
        public void addReceiptTask(Runnable runnable) {
            addTask(runnable, true);
        }

        @Override // org.springframework.messaging.simp.stomp.StompSession.Receiptable
        public void addReceiptLostTask(Runnable runnable) {
            addTask(runnable, false);
        }

        private void addTask(Runnable runnable, boolean z) {
            Assert.notNull(this.receiptId, "To track receipts, set autoReceiptEnabled=true or add 'receiptId' header");
            synchronized (this) {
                if (this.result != null && this.result.booleanValue() == z) {
                    invoke(Collections.singletonList(runnable));
                } else if (z) {
                    this.receiptCallbacks.add(runnable);
                } else {
                    this.receiptLostCallbacks.add(runnable);
                }
            }
        }

        private void invoke(List<Runnable> list) {
            Iterator<Runnable> it = list.iterator();
            while (it.hasNext()) {
                try {
                    it.next().run();
                } catch (Throwable th) {
                }
            }
        }

        public void handleReceiptReceived() {
            handleInternal(true);
        }

        public void handleReceiptNotReceived() {
            handleInternal(false);
        }

        private void handleInternal(boolean z) {
            synchronized (this) {
                if (this.result != null) {
                    return;
                }
                this.result = Boolean.valueOf(z);
                invoke(z ? this.receiptCallbacks : this.receiptLostCallbacks);
                DefaultStompSession.this.receiptHandlers.remove(this.receiptId);
                if (this.future != null) {
                    this.future.cancel(true);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/spring-messaging-4.3.16.RELEASE.jar:org/springframework/messaging/simp/stomp/DefaultStompSession$WriteInactivityTask.class */
    public class WriteInactivityTask implements Runnable {
        private WriteInactivityTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            TcpConnection tcpConnection = DefaultStompSession.this.connection;
            if (tcpConnection != null) {
                tcpConnection.send(DefaultStompSession.HEARTBEAT).addCallback(new ListenableFutureCallback<Void>() { // from class: org.springframework.messaging.simp.stomp.DefaultStompSession.WriteInactivityTask.1
                    @Override // org.springframework.util.concurrent.SuccessCallback
                    public void onSuccess(Void r2) {
                    }

                    @Override // org.springframework.util.concurrent.FailureCallback
                    public void onFailure(Throwable th) {
                        DefaultStompSession.this.handleFailure(th);
                    }
                });
            }
        }
    }

    public DefaultStompSession(StompSessionHandler stompSessionHandler, StompHeaders stompHeaders) {
        Assert.notNull(stompSessionHandler, "StompSessionHandler must not be null");
        Assert.notNull(stompHeaders, "StompHeaders must not be null");
        this.sessionId = idGenerator.generateId().toString();
        this.sessionHandler = stompSessionHandler;
        this.connectHeaders = stompHeaders;
    }

    @Override // org.springframework.messaging.simp.stomp.StompSession
    public String getSessionId() {
        return this.sessionId;
    }

    public StompSessionHandler getSessionHandler() {
        return this.sessionHandler;
    }

    @Override // org.springframework.messaging.simp.stomp.ConnectionHandlingStompSession
    public ListenableFuture<StompSession> getSessionFuture() {
        return this.sessionFuture;
    }

    public void setMessageConverter(MessageConverter messageConverter) {
        Assert.notNull(messageConverter, "MessageConverter must not be null");
        this.converter = messageConverter;
    }

    public MessageConverter getMessageConverter() {
        return this.converter;
    }

    public void setTaskScheduler(TaskScheduler taskScheduler) {
        this.taskScheduler = taskScheduler;
    }

    public TaskScheduler getTaskScheduler() {
        return this.taskScheduler;
    }

    public void setReceiptTimeLimit(long j) {
        Assert.isTrue(j > 0, "Receipt time limit must be larger than zero");
        this.receiptTimeLimit = j;
    }

    public long getReceiptTimeLimit() {
        return this.receiptTimeLimit;
    }

    @Override // org.springframework.messaging.simp.stomp.StompSession
    public void setAutoReceipt(boolean z) {
        this.autoReceiptEnabled = z;
    }

    public boolean isAutoReceiptEnabled() {
        return this.autoReceiptEnabled;
    }

    @Override // org.springframework.messaging.simp.stomp.StompSession
    public boolean isConnected() {
        return this.connection != null;
    }

    @Override // org.springframework.messaging.simp.stomp.StompSession
    public StompSession.Receiptable send(String str, Object obj) {
        StompHeaders stompHeaders = new StompHeaders();
        stompHeaders.setDestination(str);
        return send(stompHeaders, obj);
    }

    @Override // org.springframework.messaging.simp.stomp.StompSession
    public StompSession.Receiptable send(StompHeaders stompHeaders, Object obj) {
        Assert.hasText(stompHeaders.getDestination(), "Destination header is required");
        ReceiptHandler receiptHandler = new ReceiptHandler(checkOrAddReceipt(stompHeaders));
        StompHeaderAccessor createHeaderAccessor = createHeaderAccessor(StompCommand.SEND);
        createHeaderAccessor.addNativeHeaders(stompHeaders);
        execute(createMessage(createHeaderAccessor, obj));
        return receiptHandler;
    }

    private String checkOrAddReceipt(StompHeaders stompHeaders) {
        String receipt = stompHeaders.getReceipt();
        if (isAutoReceiptEnabled() && receipt == null) {
            receipt = String.valueOf(this.receiptIndex.getAndIncrement());
            stompHeaders.setReceipt(receipt);
        }
        return receipt;
    }

    private StompHeaderAccessor createHeaderAccessor(StompCommand stompCommand) {
        StompHeaderAccessor create = StompHeaderAccessor.create(stompCommand);
        create.setSessionId(this.sessionId);
        create.setLeaveMutable(true);
        return create;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Message<byte[]> createMessage(StompHeaderAccessor stompHeaderAccessor, Object obj) {
        Message message;
        stompHeaderAccessor.updateSimpMessageHeadersFromStompHeaders();
        if (obj == null) {
            message = MessageBuilder.createMessage(EMPTY_PAYLOAD, stompHeaderAccessor.getMessageHeaders());
        } else if (obj instanceof byte[]) {
            message = MessageBuilder.createMessage((byte[]) obj, stompHeaderAccessor.getMessageHeaders());
        } else {
            message = getMessageConverter().toMessage(obj, stompHeaderAccessor.getMessageHeaders());
            stompHeaderAccessor.updateStompHeadersFromSimpMessageHeaders();
            if (message == null) {
                throw new MessageConversionException("Unable to convert payload with type='" + obj.getClass().getName() + "', contentType='" + stompHeaderAccessor.getContentType() + "', converter=[" + getMessageConverter() + "]");
            }
        }
        return message;
    }

    private void execute(Message<byte[]> message) {
        if (logger.isTraceEnabled()) {
            logger.trace("Sending " + ((StompHeaderAccessor) MessageHeaderAccessor.getAccessor((Message<?>) message, StompHeaderAccessor.class)).getDetailedLogMessage(message.getPayload()));
        }
        TcpConnection<byte[]> tcpConnection = this.connection;
        Assert.state(tcpConnection != null, "Connection closed");
        try {
            tcpConnection.send(message).get();
        } catch (ExecutionException e) {
            throw new MessageDeliveryException((Message<?>) message, e.getCause());
        } catch (Throwable th) {
            throw new MessageDeliveryException((Message<?>) message, th);
        }
    }

    @Override // org.springframework.messaging.simp.stomp.StompSession
    public StompSession.Subscription subscribe(String str, StompFrameHandler stompFrameHandler) {
        StompHeaders stompHeaders = new StompHeaders();
        stompHeaders.setDestination(str);
        return subscribe(stompHeaders, stompFrameHandler);
    }

    @Override // org.springframework.messaging.simp.stomp.StompSession
    public StompSession.Subscription subscribe(StompHeaders stompHeaders, StompFrameHandler stompFrameHandler) {
        String destination = stompHeaders.getDestination();
        Assert.hasText(destination, "Destination header is required");
        Assert.notNull(stompFrameHandler, "StompFrameHandler must not be null");
        String id = stompHeaders.getId();
        if (!StringUtils.hasText(id)) {
            id = String.valueOf(this.subscriptionIndex.getAndIncrement());
            stompHeaders.setId(id);
        }
        DefaultSubscription defaultSubscription = new DefaultSubscription(id, destination, checkOrAddReceipt(stompHeaders), stompFrameHandler);
        StompHeaderAccessor createHeaderAccessor = createHeaderAccessor(StompCommand.SUBSCRIBE);
        createHeaderAccessor.addNativeHeaders(stompHeaders);
        execute(createMessage(createHeaderAccessor, EMPTY_PAYLOAD));
        return defaultSubscription;
    }

    @Override // org.springframework.messaging.simp.stomp.StompSession
    public StompSession.Receiptable acknowledge(String str, boolean z) {
        StompHeaders stompHeaders = new StompHeaders();
        if ("1.1".equals(this.version)) {
            stompHeaders.setMessageId(str);
        } else {
            stompHeaders.setId(str);
        }
        ReceiptHandler receiptHandler = new ReceiptHandler(checkOrAddReceipt(stompHeaders));
        StompHeaderAccessor createHeaderAccessor = createHeaderAccessor(z ? StompCommand.ACK : StompCommand.NACK);
        createHeaderAccessor.addNativeHeaders(stompHeaders);
        execute(createMessage(createHeaderAccessor, null));
        return receiptHandler;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void unsubscribe(String str) {
        StompHeaderAccessor createHeaderAccessor = createHeaderAccessor(StompCommand.UNSUBSCRIBE);
        createHeaderAccessor.setSubscriptionId(str);
        execute(createMessage(createHeaderAccessor, EMPTY_PAYLOAD));
    }

    @Override // org.springframework.messaging.simp.stomp.StompSession
    public void disconnect() {
        this.closing = true;
        try {
            execute(createMessage(createHeaderAccessor(StompCommand.DISCONNECT), EMPTY_PAYLOAD));
        } finally {
            resetConnection();
        }
    }

    @Override // org.springframework.messaging.tcp.TcpConnectionHandler
    public void afterConnected(TcpConnection<byte[]> tcpConnection) {
        this.connection = tcpConnection;
        if (logger.isDebugEnabled()) {
            logger.debug("Connection established in session id=" + this.sessionId);
        }
        StompHeaderAccessor createHeaderAccessor = createHeaderAccessor(StompCommand.CONNECT);
        createHeaderAccessor.addNativeHeaders(this.connectHeaders);
        createHeaderAccessor.setAcceptVersion("1.1,1.2");
        execute(createMessage(createHeaderAccessor, EMPTY_PAYLOAD));
    }

    @Override // org.springframework.messaging.tcp.TcpConnectionHandler
    public void afterConnectFailure(Throwable th) {
        if (logger.isDebugEnabled()) {
            logger.debug("Failed to connect session id=" + this.sessionId, th);
        }
        this.sessionFuture.setException(th);
        this.sessionHandler.handleTransportError(this, th);
    }

    @Override // org.springframework.messaging.tcp.TcpConnectionHandler
    public void handleMessage(Message<byte[]> message) {
        StompHeaderAccessor stompHeaderAccessor = (StompHeaderAccessor) MessageHeaderAccessor.getAccessor((Message<?>) message, StompHeaderAccessor.class);
        stompHeaderAccessor.setSessionId(this.sessionId);
        StompCommand command = stompHeaderAccessor.getCommand();
        StompHeaders readOnlyStompHeaders = StompHeaders.readOnlyStompHeaders(stompHeaderAccessor.getNativeHeaders());
        boolean isHeartbeat = stompHeaderAccessor.isHeartbeat();
        if (logger.isTraceEnabled()) {
            logger.trace("Received " + stompHeaderAccessor.getDetailedLogMessage(message.getPayload()));
        }
        try {
            if (StompCommand.MESSAGE.equals(command)) {
                DefaultSubscription defaultSubscription = this.subscriptions.get(readOnlyStompHeaders.getSubscription());
                if (defaultSubscription != null) {
                    invokeHandler(defaultSubscription.getHandler(), message, readOnlyStompHeaders);
                } else if (logger.isDebugEnabled()) {
                    logger.debug("No handler for: " + stompHeaderAccessor.getDetailedLogMessage(message.getPayload()) + ". Perhaps just unsubscribed?");
                }
            } else if (StompCommand.RECEIPT.equals(command)) {
                ReceiptHandler receiptHandler = this.receiptHandlers.get(readOnlyStompHeaders.getReceiptId());
                if (receiptHandler != null) {
                    receiptHandler.handleReceiptReceived();
                } else if (logger.isDebugEnabled()) {
                    logger.debug("No matching receipt: " + stompHeaderAccessor.getDetailedLogMessage(message.getPayload()));
                }
            } else if (StompCommand.CONNECTED.equals(command)) {
                initHeartbeatTasks(readOnlyStompHeaders);
                this.version = readOnlyStompHeaders.getFirst("version");
                this.sessionFuture.set(this);
                this.sessionHandler.afterConnected(this, readOnlyStompHeaders);
            } else if (StompCommand.ERROR.equals(command)) {
                invokeHandler(this.sessionHandler, message, readOnlyStompHeaders);
            } else if (!isHeartbeat && logger.isTraceEnabled()) {
                logger.trace("Message not handled.");
            }
        } catch (Throwable th) {
            this.sessionHandler.handleException(this, command, readOnlyStompHeaders, message.getPayload(), th);
        }
    }

    private void invokeHandler(StompFrameHandler stompFrameHandler, Message<byte[]> message, StompHeaders stompHeaders) {
        if (message.getPayload().length == 0) {
            stompFrameHandler.handleFrame(stompHeaders, null);
            return;
        }
        Class<?> resolve = ResolvableType.forType(stompFrameHandler.getPayloadType(stompHeaders)).resolve();
        Object fromMessage = getMessageConverter().fromMessage(message, resolve);
        if (fromMessage == null) {
            throw new MessageConversionException("No suitable converter, payloadType=" + resolve + ", handlerType=" + stompFrameHandler.getClass());
        }
        stompFrameHandler.handleFrame(stompHeaders, fromMessage);
    }

    private void initHeartbeatTasks(StompHeaders stompHeaders) {
        long[] heartbeat = this.connectHeaders.getHeartbeat();
        long[] heartbeat2 = stompHeaders.getHeartbeat();
        if (heartbeat == null || heartbeat2 == null) {
            return;
        }
        if (heartbeat[0] > 0 && heartbeat2[1] > 0) {
            this.connection.onWriteInactivity(new WriteInactivityTask(), Math.max(heartbeat[0], heartbeat2[1]));
        }
        if (heartbeat[1] <= 0 || heartbeat2[0] <= 0) {
            return;
        }
        this.connection.onReadInactivity(new ReadInactivityTask(), Math.max(heartbeat[1], heartbeat2[0]) * HEARTBEAT_MULTIPLIER);
    }

    @Override // org.springframework.messaging.tcp.TcpConnectionHandler
    public void handleFailure(Throwable th) {
        try {
            this.sessionFuture.setException(th);
            this.sessionHandler.handleTransportError(this, th);
        } catch (Throwable th2) {
            if (logger.isDebugEnabled()) {
                logger.debug("Uncaught failure while handling transport failure", th2);
            }
        }
    }

    @Override // org.springframework.messaging.tcp.TcpConnectionHandler
    public void afterConnectionClosed() {
        if (logger.isDebugEnabled()) {
            logger.debug("Connection closed in session id=" + this.sessionId);
        }
        if (this.closing) {
            return;
        }
        resetConnection();
        handleFailure(new ConnectionLostException("Connection closed"));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resetConnection() {
        TcpConnection<byte[]> tcpConnection = this.connection;
        this.connection = null;
        if (tcpConnection != null) {
            try {
                tcpConnection.close();
            } catch (Throwable th) {
            }
        }
    }
}
