package org.wso2.carbon.caching.invalidator.amqp;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import javax.cache.CacheManager;
import javax.cache.Caching;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.caching.invalidator.internal.CacheInvalidationDataHolder;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.core.clustering.api.CoordinatedActivity;

/* loaded from: input_file:org/wso2/carbon/caching/invalidator/amqp/CacheInvalidationSubscriber.class */
public class CacheInvalidationSubscriber implements CoordinatedActivity {
    private static final Log log = LogFactory.getLog(CacheInvalidationSubscriber.class);
    private QueueingConsumer consumer = null;
    private Runnable messageReciever = new Runnable() { // from class: org.wso2.carbon.caching.invalidator.amqp.CacheInvalidationSubscriber.1
        @Override // java.lang.Runnable
        public void run() {
            while (CacheInvalidationSubscriber.this.consumer != null) {
                try {
                    CacheInvalidationSubscriber.this.onMessage(CacheInvalidationSubscriber.this.consumer.nextDelivery().getBody());
                } catch (Exception e) {
                    CacheInvalidationSubscriber.log.error("Global cache invalidation: error message recieve.", e);
                }
            }
        }
    };

    public CacheInvalidationSubscriber() {
        if (CacheInvalidationDataHolder.getConfigContext() == null || !CacheInvalidationDataHolder.getConfigContext().getAxisConfiguration().getClusteringAgent().isCoordinator() || ConfigurationManager.isSubscribed()) {
            return;
        }
        subscribe();
        ConfigurationManager.setSubscribed(true);
    }

    public void execute() {
        if (!ConfigurationManager.init() || CacheInvalidationDataHolder.getConfigContext() == null) {
            return;
        }
        boolean isCoordinator = CacheInvalidationDataHolder.getConfigContext().getAxisConfiguration().getClusteringAgent().isCoordinator();
        if (isCoordinator && !ConfigurationManager.isSubscribed()) {
            subscribe();
            ConfigurationManager.setSubscribed(true);
        }
        if (isCoordinator || !ConfigurationManager.isSubscribed()) {
            return;
        }
        ConfigurationManager.setSubscribed(false);
    }

    private void subscribe() {
        log.debug("Global cache invalidation: initializing the subscription");
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost(ConfigurationManager.getProviderUrl());
            connectionFactory.setPort(Integer.parseInt(ConfigurationManager.getProviderPort()));
            connectionFactory.setUsername(ConfigurationManager.getProviderUsername());
            connectionFactory.setPassword(ConfigurationManager.getProviderPassword());
            Channel createChannel = connectionFactory.newConnection().createChannel();
            createChannel.exchangeDeclare(ConfigurationManager.getTopicName(), "topic");
            String queue = createChannel.queueDeclare().getQueue();
            createChannel.queueBind(queue, ConfigurationManager.getTopicName(), "#");
            this.consumer = new QueueingConsumer(createChannel);
            createChannel.basicConsume(queue, true, this.consumer);
            new Thread(this.messageReciever).start();
            log.info("Global cache invalidation is online");
        } catch (Exception e) {
            log.error("Global cache invalidation: Error message broker initialization", e);
        }
    }

    public void onMessage(byte[] bArr) {
        log.debug("Cache invalidation message received: " + new String(bArr));
        boolean z = false;
        if (CacheInvalidationDataHolder.getConfigContext() != null) {
            z = CacheInvalidationDataHolder.getConfigContext().getAxisConfiguration().getClusteringAgent().isCoordinator();
        }
        if (z) {
            PrivilegedCarbonContext.startTenantFlow();
            try {
                log.debug("Global cache invalidation: deserializing data to object");
                GlobalCacheInvalidationEvent globalCacheInvalidationEvent = (GlobalCacheInvalidationEvent) deserialize(bArr);
                log.debug("Global cache invalidation: deserializing complete");
                if (ConfigurationManager.getSentMsgBuffer().contains(globalCacheInvalidationEvent.getUuid().trim())) {
                    ConfigurationManager.getSentMsgBuffer().remove(globalCacheInvalidationEvent.getUuid().trim());
                    log.debug("Global cache invalidation: own message ignored");
                } else {
                    PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(globalCacheInvalidationEvent.getTenantId(), true);
                    CacheManager cacheManager = Caching.getCacheManagerFactory().getCacheManager(globalCacheInvalidationEvent.getCacheManagerName());
                    if (cacheManager == null) {
                        log.error("Global cache invalidation: error cache manager is null");
                    } else if (cacheManager.getCache(globalCacheInvalidationEvent.getCacheName()) != null) {
                        cacheManager.getCache(globalCacheInvalidationEvent.getCacheName()).remove(globalCacheInvalidationEvent.getCacheKey());
                        log.debug("Global cache invalidated: " + globalCacheInvalidationEvent.getCacheKey());
                    } else {
                        log.error("Global cache invalidation: error cache is null");
                    }
                }
            } catch (Exception e) {
                log.error("Global cache invalidation: error local cache update", e);
            } finally {
                PrivilegedCarbonContext.endTenantFlow();
            }
        }
    }

    private Object deserialize(byte[] bArr) throws Exception {
        return new ObjectInputStream(new ByteArrayInputStream(bArr)).readObject();
    }
}
