package org.wso2.carbon.cache.sync.jms.manager;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.cache.CacheEntryInfo;
import javax.cache.CacheInvalidationRequestSender;
import javax.cache.event.CacheEntryCreatedListener;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryRemovedListener;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.caching.impl.clustering.ClusterCacheInvalidationRequest;
import org.wso2.carbon.context.PrivilegedCarbonContext;

/* loaded from: input_file:org/wso2/carbon/cache/sync/jms/manager/JMSProducer.class */
public class JMSProducer implements CacheEntryRemovedListener, CacheEntryUpdatedListener, CacheEntryCreatedListener, CacheInvalidationRequestSender {
    private static final Log log = LogFactory.getLog(JMSProducer.class);
    private static final ExecutorService executorService = Executors.newFixedThreadPool(15);
    private final ConnectionFactory connectionFactory;
    private final InitialContext initialContext;
    private Topic topic;
    private Connection connection;
    private Session session;
    private MessageProducer producer;
    private static volatile JMSProducer instance;

    private JMSProducer() {
        try {
            this.initialContext = JMSUtils.createInitialContext();
            this.connectionFactory = JMSUtils.getConnectionFactory(this.initialContext);
        } catch (NamingException | JMSException | IOException e) {
            throw new RuntimeException("Error initializing JMS client resources", e);
        }
    }

    public static JMSProducer getInstance() {
        if (instance == null) {
            synchronized (JMSProducer.class) {
                if (instance == null) {
                    instance = new JMSProducer();
                }
            }
        }
        return instance;
    }

    public void startService() {
        if (!JMSUtils.isMBCacheInvalidatorEnabled().booleanValue()) {
            log.debug("JMS MB based cache invalidation is not enabled.");
            return;
        }
        try {
            startConnection();
        } catch (JMSException | NamingException e) {
            throw new RuntimeException("Error starting JMS connection ", e);
        }
    }

    @SuppressFBWarnings
    public void send(CacheEntryInfo cacheEntryInfo) {
        String tenantDomain = cacheEntryInfo.getTenantDomain();
        int tenantId = cacheEntryInfo.getTenantId();
        if (!JMSUtils.isMBCacheInvalidatorEnabled().booleanValue()) {
            log.debug("MB based cache invalidation is not enabled");
            return;
        }
        if (this.connection == null || this.session == null) {
            log.debug("JMS Producer connection is not initialized");
            retryConnection();
        }
        if (-1 == tenantId) {
            if (log.isDebugEnabled()) {
                log.debug("Tenant information cannot be found in the request. This originated from: \n" + ExceptionUtils.getStackTrace(new Throwable()));
                return;
            }
            return;
        }
        if (cacheEntryInfo.getCacheName().startsWith(JMSUtils.LOCAL_CACHE_PREFIX)) {
            if (JMSUtils.isAllowedToPropagate(cacheEntryInfo.getCacheManagerName(), cacheEntryInfo.getCacheName())) {
                if (log.isDebugEnabled()) {
                    log.debug("Sending cache invalidation message to other cluster nodes for '" + cacheEntryInfo.getCacheKey() + "' of the cache '" + cacheEntryInfo.getCacheName() + "' of the cache manager '" + cacheEntryInfo.getCacheManagerName() + "'");
                }
                sendAsyncInvalidation(new ClusterCacheInvalidationRequest(new ClusterCacheInvalidationRequest.CacheInfo(cacheEntryInfo.getCacheManagerName(), cacheEntryInfo.getCacheName(), cacheEntryInfo.getCacheKey()), tenantDomain, tenantId));
            } else if (log.isDebugEnabled()) {
                log.debug("Cache " + cacheEntryInfo.getCacheKey() + " is not allowed to propagate to other clusters as per configurations.");
            }
        }
    }

    @SuppressFBWarnings
    public void sendAsyncInvalidation(ClusterCacheInvalidationRequest clusterCacheInvalidationRequest) {
        executorService.submit(() -> {
            sendInvalidationMessage(clusterCacheInvalidationRequest);
        });
    }

    @SuppressFBWarnings
    void sendInvalidationMessage(ClusterCacheInvalidationRequest clusterCacheInvalidationRequest) {
        try {
            if (!isSessionValid(this.session)) {
                retryConnection();
            }
            TextMessage createTextMessage = this.session.createTextMessage(clusterCacheInvalidationRequest.toString());
            if (StringUtils.isNotBlank(JMSUtils.getProducerName())) {
                createTextMessage.setStringProperty(JMSUtils.SENDER, JMSUtils.getProducerName());
            }
            this.producer.send(createTextMessage);
        } catch (JMSException e) {
            log.error("Something went wrong with JMS producer connection." + e);
        }
    }

    public void entryCreated(CacheEntryEvent cacheEntryEvent) throws CacheEntryListenerException {
    }

    public void entryRemoved(CacheEntryEvent cacheEntryEvent) throws CacheEntryListenerException {
        send(createCacheInfo(cacheEntryEvent));
    }

    public void entryUpdated(CacheEntryEvent cacheEntryEvent) throws CacheEntryListenerException {
        send(createCacheInfo(cacheEntryEvent));
    }

    public static CacheEntryInfo createCacheInfo(CacheEntryEvent cacheEntryEvent) {
        return new CacheEntryInfo(cacheEntryEvent.getSource().getCacheManager().getName(), cacheEntryEvent.getSource().getName(), cacheEntryEvent.getKey(), PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain(true), PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId(true));
    }

    public void shutdownExecutorService() {
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(60L, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        } finally {
            closeResources();
        }
    }

    public void closeResources() {
        try {
            if (this.producer != null) {
                this.producer.close();
            }
            if (this.session != null) {
                this.session.close();
            }
            if (this.connection != null) {
                this.connection.close();
            }
        } catch (JMSException e) {
            log.error("Error closing JMS resources.", e);
        }
    }

    void startConnection() throws JMSException, NamingException {
        this.connection = JMSUtils.createConnection(this.connectionFactory);
        this.connection.start();
        this.session = this.connection.createSession(false, 1);
        this.topic = JMSUtils.getCacheTopic(this.initialContext, this.session);
        this.producer = this.session.createProducer(this.topic);
    }

    private void retryConnection() {
        int i = 0;
        while (true) {
            if ((this.connection != null && this.session != null) || i > 10) {
                return;
            }
            try {
                startConnection();
                log.debug("Attempting retry JMS Producer connection.");
            } catch (JMSException | NamingException e) {
                i++;
            }
        }
    }

    private boolean isSessionValid(Session session) {
        try {
            session.getTransacted();
            return true;
        } catch (JMSException e) {
            log.debug("JMS session is expired or invalid.");
            return false;
        }
    }
}
