package org.wso2.carbon.event.processor.core.internal.storm.status.monitor;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import java.net.SocketException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.databridge.commons.thrift.utils.HostAddressFinder;
import org.wso2.carbon.event.processor.core.internal.ds.EventProcessorValueHolder;
import org.wso2.carbon.event.processor.core.internal.storm.StormTopologyManager;
import org.wso2.carbon.event.processor.core.internal.storm.status.monitor.exception.DeploymentStatusMonitorException;
import org.wso2.carbon.event.processor.core.util.DistributedModeConstants;
import org.wso2.carbon.event.processor.core.util.ExecutionPlanStatusHolder;
import org.wso2.carbon.event.processor.manager.commons.transport.server.ConnectionCallback;

/* loaded from: input_file:org/wso2/carbon/event/processor/core/internal/storm/status/monitor/StormStatusMonitor.class */
public class StormStatusMonitor implements ConnectionCallback {
    private static final Log log = LogFactory.getLog(StormStatusMonitor.class);
    private final String stormTopologyName;
    private final String executionPlanName;
    private final String executionPlanStatusHolderKey;
    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
    private final int lockTimeout;
    private final String tenantDomain;
    private String hostIp;
    private AtomicInteger connectedCepReceiversCount;
    private int importedStreamsCount;
    private AtomicInteger connectedPublisherBoltsCount;

    /* loaded from: input_file:org/wso2/carbon/event/processor/core/internal/storm/status/monitor/StormStatusMonitor$GlobalStatUpdater.class */
    class GlobalStatUpdater implements Runnable {
        private final int updateRate = EventProcessorValueHolder.getStormDeploymentConfiguration().getStatusUpdateInterval();

        GlobalStatUpdater() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                HazelcastInstance hazelcastInstance = EventProcessorValueHolder.getHazelcastInstance();
                if (hazelcastInstance == null || !hazelcastInstance.getLifecycleService().isRunning()) {
                    StormStatusMonitor.log.error("Couldn't update distributed deployment status for execution plan: " + StormStatusMonitor.this.executionPlanName + ", for tenant-domain: " + StormStatusMonitor.this.tenantDomain + " as the hazelcast instance is not active or not available.");
                } else {
                    IMap map = hazelcastInstance.getMap(DistributedModeConstants.STORM_STATUS_MAP);
                    try {
                        if (StormStatusMonitor.this.hostIp == null) {
                            StormStatusMonitor.this.hostIp = HostAddressFinder.findAddress("localhost");
                        }
                        if (map.tryLock(StormStatusMonitor.this.executionPlanStatusHolderKey, StormStatusMonitor.this.lockTimeout, TimeUnit.MILLISECONDS)) {
                            try {
                                ExecutionPlanStatusHolder executionPlanStatusHolder = (ExecutionPlanStatusHolder) map.get(StormStatusMonitor.this.stormTopologyName);
                                if (executionPlanStatusHolder == null) {
                                    StormStatusMonitor.log.error("Couldn't update distributed deployment status for execution plan: " + StormStatusMonitor.this.executionPlanName + ", for tenant-domain: " + StormStatusMonitor.this.tenantDomain + " as status object not initialized by manager.");
                                } else {
                                    executionPlanStatusHolder.setCEPReceiverStatus(StormStatusMonitor.this.hostIp, StormStatusMonitor.this.connectedCepReceiversCount.get(), StormStatusMonitor.this.importedStreamsCount);
                                    executionPlanStatusHolder.setConnectedPublisherBoltsCount(StormStatusMonitor.this.hostIp, StormStatusMonitor.this.connectedPublisherBoltsCount.get());
                                    map.replace(StormStatusMonitor.this.stormTopologyName, executionPlanStatusHolder);
                                    if (StormStatusMonitor.log.isDebugEnabled()) {
                                        StormStatusMonitor.log.debug("Updated distributed deployment status as follows. \nConnected CEP receivers count: " + StormStatusMonitor.this.connectedCepReceiversCount.get() + "\nConnected publisher bolts count: " + StormStatusMonitor.this.connectedPublisherBoltsCount.get() + "\nfor execution plan: " + StormStatusMonitor.this.executionPlanName + ", for tenant-domain: " + StormStatusMonitor.this.tenantDomain + ", for IP address: " + StormStatusMonitor.this.hostIp);
                                    }
                                }
                                map.unlock(StormStatusMonitor.this.executionPlanStatusHolderKey);
                            } catch (Throwable th) {
                                map.unlock(StormStatusMonitor.this.executionPlanStatusHolderKey);
                                throw th;
                                break;
                            }
                        } else {
                            StormStatusMonitor.log.error("Couldn't update distributed deployment status for execution plan: " + StormStatusMonitor.this.executionPlanName + ", for tenant-domain: " + StormStatusMonitor.this.tenantDomain + " as the hazelcast lock acquisition failed.");
                        }
                    } catch (InterruptedException e) {
                        StormStatusMonitor.log.error("Couldn't update distributed deployment status for execution plan: " + StormStatusMonitor.this.executionPlanName + ", for tenant-domain: " + StormStatusMonitor.this.tenantDomain + " as the hazelcast lock acquisition was interrupted.", e);
                        Thread.currentThread().interrupt();
                        return;
                    } catch (SocketException e2) {
                        StormStatusMonitor.log.error("Couldn't update distributed deployment status for execution plan: " + StormStatusMonitor.this.executionPlanName + ", for tenant-domain: " + StormStatusMonitor.this.tenantDomain + " as the host IP couldn't be found for this node.", e2);
                    }
                }
                try {
                    Thread.sleep(this.updateRate);
                } catch (InterruptedException e3) {
                    Thread.currentThread().interrupt();
                    if (StormStatusMonitor.log.isDebugEnabled()) {
                        StormStatusMonitor.log.debug("GlobalStatUpdater was interrupted, hence returning. Details: execution plan name: " + StormStatusMonitor.this.executionPlanName + ", tenant domain: " + StormStatusMonitor.this.tenantDomain);
                        return;
                    }
                    return;
                }
            }
        }
    }

    public StormStatusMonitor(int i, String str, int i2) throws DeploymentStatusMonitorException {
        this.hostIp = null;
        this.importedStreamsCount = 0;
        if (EventProcessorValueHolder.getHazelcastInstance() == null) {
            throw new DeploymentStatusMonitorException("Couldn't initialize Distributed Deployment Status monitor as the hazelcast instance is null. Enable clustering and restart the server");
        }
        this.tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
        this.connectedCepReceiversCount = new AtomicInteger(0);
        this.connectedPublisherBoltsCount = new AtomicInteger(0);
        try {
            this.hostIp = HostAddressFinder.findAddress("localhost");
        } catch (SocketException e) {
        }
        this.importedStreamsCount = i2;
        this.executionPlanName = str;
        this.stormTopologyName = StormTopologyManager.getTopologyName(str, i);
        this.executionPlanStatusHolderKey = "org.wso2.cep.org.wso2.carbon.event.processor.core.storm.status.execution.plan.ui." + this.stormTopologyName;
        this.lockTimeout = EventProcessorValueHolder.getStormDeploymentConfiguration().getStatusLockTimeout();
        this.executorService.execute(new GlobalStatUpdater());
    }

    /* JADX WARN: Finally extract failed */
    public void onCepReceiverConnect() {
        HazelcastInstance hazelcastInstance = EventProcessorValueHolder.getHazelcastInstance();
        if (hazelcastInstance == null || !hazelcastInstance.getLifecycleService().isRunning()) {
            log.error("Couldn't increment connected CEP receivers count for execution plan: " + this.executionPlanName + ", for tenant-domain: " + this.tenantDomain + " as the hazelcast instance is not active or not available.");
            return;
        }
        IMap map = hazelcastInstance.getMap(DistributedModeConstants.STORM_STATUS_MAP);
        try {
            if (this.hostIp == null) {
                this.hostIp = HostAddressFinder.findAddress("localhost");
            }
            if (map.tryLock(this.executionPlanStatusHolderKey, this.lockTimeout, TimeUnit.MILLISECONDS)) {
                try {
                    ExecutionPlanStatusHolder executionPlanStatusHolder = (ExecutionPlanStatusHolder) map.get(this.stormTopologyName);
                    if (executionPlanStatusHolder == null) {
                        log.error("Couldn't increment connected CEP receivers count for execution plan: " + this.executionPlanName + ", for tenant-domain: " + this.tenantDomain + " as status object not initialized by manager.");
                    } else {
                        executionPlanStatusHolder.setCEPReceiverStatus(this.hostIp, this.connectedCepReceiversCount.incrementAndGet(), this.importedStreamsCount);
                        map.replace(this.stormTopologyName, executionPlanStatusHolder);
                        if (log.isDebugEnabled()) {
                            log.debug("Incremented connected CEP receiver count as " + this.connectedCepReceiversCount.get() + " for execution plan: " + this.executionPlanName + ", for tenant-domain: " + this.tenantDomain + ", for IP address: " + this.hostIp);
                        }
                    }
                    map.unlock(this.executionPlanStatusHolderKey);
                } catch (Throwable th) {
                    map.unlock(this.executionPlanStatusHolderKey);
                    throw th;
                }
            } else {
                log.error("Couldn't increment connected CEP receivers count for execution plan: " + this.executionPlanName + ", for tenant-domain: " + this.tenantDomain + " as the hazelcast lock acquisition failed.");
            }
        } catch (InterruptedException e) {
            log.error("Couldn't increment connected CEP receivers count for execution plan: " + this.executionPlanName + ", for tenant-domain: " + this.tenantDomain + " as the hazelcast lock acquisition was interrupted.", e);
            Thread.currentThread().interrupt();
        } catch (SocketException e2) {
            log.error("Couldn't increment connected CEP receivers count for execution plan: " + this.executionPlanName + ", for tenant-domain: " + this.tenantDomain + " as the host IP couldn't be found for this node.", e2);
        }
    }

    /* JADX WARN: Finally extract failed */
    public void onCepReceiverDisconnect() {
        HazelcastInstance hazelcastInstance = EventProcessorValueHolder.getHazelcastInstance();
        if (hazelcastInstance == null || !hazelcastInstance.getLifecycleService().isRunning()) {
            log.error("Couldn't decrement connected CEP receivers count for execution plan: " + this.executionPlanName + ", for tenant-domain: " + this.tenantDomain + " as the hazelcast instance is not active or not available.");
            return;
        }
        IMap map = hazelcastInstance.getMap(DistributedModeConstants.STORM_STATUS_MAP);
        try {
            if (this.hostIp == null) {
                this.hostIp = HostAddressFinder.findAddress("localhost");
            }
            if (map.tryLock(this.executionPlanStatusHolderKey, this.lockTimeout, TimeUnit.MILLISECONDS)) {
                try {
                    ExecutionPlanStatusHolder executionPlanStatusHolder = (ExecutionPlanStatusHolder) map.get(this.stormTopologyName);
                    if (executionPlanStatusHolder == null) {
                        log.error("Couldn't decrement connected CEP receivers count for execution plan: " + this.executionPlanName + ", for tenant-domain: " + this.tenantDomain + " as status object not initialized by manager.");
                    } else {
                        executionPlanStatusHolder.setCEPReceiverStatus(this.hostIp, this.connectedCepReceiversCount.decrementAndGet(), this.importedStreamsCount);
                        map.replace(this.stormTopologyName, executionPlanStatusHolder);
                        if (log.isDebugEnabled()) {
                            log.debug("Decremented connected CEP receiver count as " + this.connectedCepReceiversCount.get() + " for execution plan: " + this.executionPlanName + ", for tenant-domain: " + this.tenantDomain + ", for IP address: " + this.hostIp);
                        }
                    }
                    map.unlock(this.executionPlanStatusHolderKey);
                } catch (Throwable th) {
                    map.unlock(this.executionPlanStatusHolderKey);
                    throw th;
                }
            } else {
                log.error("Couldn't decrement connected CEP receivers count for execution plan: " + this.executionPlanName + ", for tenant-domain: " + this.tenantDomain + " as the hazelcast lock acquisition failed.");
            }
        } catch (InterruptedException e) {
            log.error("Couldn't decrement connected CEP receivers count for execution plan: " + this.executionPlanName + ", for tenant-domain: " + this.tenantDomain + " as the hazelcast lock acquisition was interrupted.", e);
            Thread.currentThread().interrupt();
        } catch (SocketException e2) {
            log.error("Couldn't decrement connected CEP receivers count for execution plan: " + this.executionPlanName + ", for tenant-domain: " + this.tenantDomain + " as the host IP couldn't be found for this node.", e2);
        }
    }

    /* JADX WARN: Finally extract failed */
    public void onPublisherBoltConnect() {
        HazelcastInstance hazelcastInstance = EventProcessorValueHolder.getHazelcastInstance();
        if (hazelcastInstance == null || !hazelcastInstance.getLifecycleService().isRunning()) {
            log.error("Couldn't increment connected publisher bolts count for execution plan: " + this.executionPlanName + ", for tenant-domain: " + this.tenantDomain + " as the hazelcast instance is not active or not available.");
            return;
        }
        IMap map = hazelcastInstance.getMap(DistributedModeConstants.STORM_STATUS_MAP);
        try {
            if (this.hostIp == null) {
                this.hostIp = HostAddressFinder.findAddress("localhost");
            }
            if (map.tryLock(this.executionPlanStatusHolderKey, this.lockTimeout, TimeUnit.MILLISECONDS)) {
                try {
                    ExecutionPlanStatusHolder executionPlanStatusHolder = (ExecutionPlanStatusHolder) map.get(this.stormTopologyName);
                    if (executionPlanStatusHolder == null) {
                        log.error("Couldn't increment connected publisher bolts count for execution plan: " + this.executionPlanName + ", for tenant-domain: " + this.tenantDomain + " as status object not initialized by manager.");
                    } else {
                        executionPlanStatusHolder.setConnectedPublisherBoltsCount(this.hostIp, this.connectedPublisherBoltsCount.incrementAndGet());
                        map.replace(this.stormTopologyName, executionPlanStatusHolder);
                        if (log.isDebugEnabled()) {
                            log.debug("Incremented connected publisher bolt count as " + this.connectedPublisherBoltsCount.get() + " for execution plan: " + this.executionPlanName + ", for tenant-domain: " + this.tenantDomain + ", for IP address: " + this.hostIp);
                        }
                    }
                    map.unlock(this.executionPlanStatusHolderKey);
                } catch (Throwable th) {
                    map.unlock(this.executionPlanStatusHolderKey);
                    throw th;
                }
            } else {
                log.error("Couldn't increment connected publisher bolts count for execution plan: " + this.executionPlanName + ", for tenant-domain: " + this.tenantDomain + " as the hazelcast lock acquisition failed.");
            }
        } catch (InterruptedException e) {
            log.error("Couldn't increment connected publisher bolts count for execution plan: " + this.executionPlanName + ", for tenant-domain: " + this.tenantDomain + " as the hazelcast lock acquisition was interrupted.", e);
            Thread.currentThread().interrupt();
        } catch (SocketException e2) {
            log.error("Couldn't increment connected publisher bolts count for execution plan: " + this.executionPlanName + ", for tenant-domain: " + this.tenantDomain + " as the host IP couldn't be found for this node.", e2);
        }
    }

    /* JADX WARN: Finally extract failed */
    public void onPublisherBoltDisconnect() {
        HazelcastInstance hazelcastInstance = EventProcessorValueHolder.getHazelcastInstance();
        if (hazelcastInstance == null || !hazelcastInstance.getLifecycleService().isRunning()) {
            log.error("Couldn't decrement connected publisher bolts count for execution plan: " + this.executionPlanName + ", for tenant-domain: " + this.tenantDomain + " as the hazelcast instance is not active or not available.");
            return;
        }
        IMap map = hazelcastInstance.getMap(DistributedModeConstants.STORM_STATUS_MAP);
        try {
            if (this.hostIp == null) {
                this.hostIp = HostAddressFinder.findAddress("localhost");
            }
            if (map.tryLock(this.executionPlanStatusHolderKey, this.lockTimeout, TimeUnit.MILLISECONDS)) {
                try {
                    ExecutionPlanStatusHolder executionPlanStatusHolder = (ExecutionPlanStatusHolder) map.get(this.stormTopologyName);
                    if (executionPlanStatusHolder == null) {
                        log.error("Couldn't decrement connected publisher bolts count for execution plan: " + this.executionPlanName + ", for tenant-domain: " + this.tenantDomain + " as status object not initialized by manager.");
                    } else {
                        executionPlanStatusHolder.setConnectedPublisherBoltsCount(this.hostIp, this.connectedPublisherBoltsCount.decrementAndGet());
                        map.replace(this.stormTopologyName, executionPlanStatusHolder);
                        if (log.isDebugEnabled()) {
                            log.debug("Decremented connected publisher bolt count as " + this.connectedPublisherBoltsCount.get() + " for execution plan: " + this.executionPlanName + ", for tenant-domain: " + this.tenantDomain + ", for IP address: " + this.hostIp);
                        }
                    }
                    map.unlock(this.executionPlanStatusHolderKey);
                } catch (Throwable th) {
                    map.unlock(this.executionPlanStatusHolderKey);
                    throw th;
                }
            } else {
                log.error("Couldn't decrement connected publisher bolts count for execution plan: " + this.executionPlanName + ", for tenant-domain: " + this.tenantDomain + " as the hazelcast lock acquisition failed.");
            }
        } catch (InterruptedException e) {
            log.error("Couldn't decrement connected publisher bolts count for execution plan: " + this.executionPlanName + ", for tenant-domain: " + this.tenantDomain + " as the hazelcast lock acquisition was interrupted.", e);
            Thread.currentThread().interrupt();
        } catch (SocketException e2) {
            log.error("Couldn't decrement connected publisher bolts count for execution plan: " + this.executionPlanName + ", for tenant-domain: " + this.tenantDomain + " as the host IP couldn't be found for this node.", e2);
        }
    }

    /* JADX WARN: Finally extract failed */
    public void hazelcastListenerCallback() {
        HazelcastInstance hazelcastInstance = EventProcessorValueHolder.getHazelcastInstance();
        if (hazelcastInstance == null || !hazelcastInstance.getLifecycleService().isRunning()) {
            log.error("Couldn't update distributed deployment status for execution plan: " + this.executionPlanName + ", for tenant-domain: " + this.tenantDomain + " as the hazelcast instance is not active or not available.");
            return;
        }
        IMap map = hazelcastInstance.getMap(DistributedModeConstants.STORM_STATUS_MAP);
        try {
            if (this.hostIp == null) {
                this.hostIp = HostAddressFinder.findAddress("localhost");
            }
            if (map.tryLock(this.executionPlanStatusHolderKey, this.lockTimeout, TimeUnit.MILLISECONDS)) {
                try {
                    ExecutionPlanStatusHolder executionPlanStatusHolder = (ExecutionPlanStatusHolder) map.get(this.stormTopologyName);
                    if (executionPlanStatusHolder == null) {
                        log.error("Couldn't update distributed deployment status for execution plan: " + this.executionPlanName + ", for tenant-domain: " + this.tenantDomain + " as status object not initialized by manager.");
                    } else {
                        executionPlanStatusHolder.setCEPReceiverStatus(this.hostIp, this.connectedCepReceiversCount.get(), this.importedStreamsCount);
                        executionPlanStatusHolder.setConnectedPublisherBoltsCount(this.hostIp, this.connectedPublisherBoltsCount.get());
                        map.replace(this.stormTopologyName, executionPlanStatusHolder);
                        if (log.isDebugEnabled()) {
                            log.debug("Updated distributed deployment status as follows. \nConnected CEP receivers count: " + this.connectedCepReceiversCount.get() + "\nConnected publisher bolts count: " + this.connectedPublisherBoltsCount.get() + "\nfor execution plan: " + this.executionPlanName + ", for tenant-domain: " + this.tenantDomain + ", for IP address: " + this.hostIp);
                        }
                    }
                    map.unlock(this.executionPlanStatusHolderKey);
                } catch (Throwable th) {
                    map.unlock(this.executionPlanStatusHolderKey);
                    throw th;
                }
            } else {
                log.error("Couldn't update distributed deployment status for execution plan: " + this.executionPlanName + ", for tenant-domain: " + this.tenantDomain + " as the hazelcast lock acquisition failed.");
            }
        } catch (InterruptedException e) {
            log.error("Couldn't update distributed deployment status for execution plan: " + this.executionPlanName + ", for tenant-domain: " + this.tenantDomain + " as the hazelcast lock acquisition was interrupted.", e);
            Thread.currentThread().interrupt();
        } catch (SocketException e2) {
            log.error("Couldn't update distributed deployment status for execution plan: " + this.executionPlanName + ", for tenant-domain: " + this.tenantDomain + " as the host IP couldn't be found for this node.", e2);
        }
    }

    public void shutdown() {
        this.executorService.shutdownNow();
    }
}
