package org.wso2.carbon.event.processor.manager.core.internal;

import com.hazelcast.cluster.MembershipEvent;
import com.hazelcast.cluster.MembershipListener;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.map.IMap;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.event.processor.manager.core.EventManagementService;
import org.wso2.carbon.event.processor.manager.core.EventProcessorManagementService;
import org.wso2.carbon.event.processor.manager.core.EventPublisherManagementService;
import org.wso2.carbon.event.processor.manager.core.EventReceiverManagementService;
import org.wso2.carbon.event.processor.manager.core.EventSync;
import org.wso2.carbon.event.processor.manager.core.Manager;
import org.wso2.carbon.event.processor.manager.core.config.DistributedConfiguration;
import org.wso2.carbon.event.processor.manager.core.config.HAConfiguration;
import org.wso2.carbon.event.processor.manager.core.config.ManagementModeInfo;
import org.wso2.carbon.event.processor.manager.core.config.Mode;
import org.wso2.carbon.event.processor.manager.core.config.PersistenceConfiguration;
import org.wso2.carbon.event.processor.manager.core.exception.EventManagementException;
import org.wso2.carbon.event.processor.manager.core.exception.ManagementConfigurationException;
import org.wso2.carbon.event.processor.manager.core.internal.ds.EventManagementServiceValueHolder;
import org.wso2.carbon.event.processor.manager.core.internal.util.ConfigurationConstants;
import org.wso2.carbon.event.processor.manager.core.internal.util.ManagementModeConfigurationLoader;
import org.wso2.carbon.utils.ConfigurationContextService;

/* loaded from: input_file:org/wso2/carbon/event/processor/manager/core/internal/CarbonEventManagementService.class */
public class CarbonEventManagementService implements EventManagementService {
    private static Log log = LogFactory.getLog(CarbonEventManagementService.class);
    private Mode mode;
    private ManagementModeInfo managementModeInfo;
    private EventProcessorManagementService processorManager;
    private EventReceiverManagementService receiverManager;
    private List<EventPublisherManagementService> publisherManager;
    private EventHandler receiverEventHandler = new EventHandler();
    private EventHandler presenterEventHandler = new EventHandler();
    private ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(3);
    private HAManager haManager = null;
    private IMap<String, Long> haEventPublisherTimeSyncMap = null;
    private PersistenceManager persistenceManager;
    private StormReceiverCoordinator stormReceiverCoordinator;
    private boolean isManagerNode;
    private boolean isWorkerNode;
    private boolean isPresenterNode;

    public CarbonEventManagementService() {
        this.mode = Mode.SingleNode;
        this.persistenceManager = null;
        this.stormReceiverCoordinator = null;
        this.isManagerNode = false;
        this.isWorkerNode = false;
        this.isPresenterNode = false;
        try {
            this.managementModeInfo = ManagementModeConfigurationLoader.loadManagementModeInfo();
            this.mode = this.managementModeInfo.getMode();
            this.publisherManager = new CopyOnWriteArrayList();
            if (this.mode == Mode.HA) {
                HAConfiguration haConfiguration = this.managementModeInfo.getHaConfiguration();
                this.isWorkerNode = haConfiguration.isWorkerNode();
                this.isPresenterNode = haConfiguration.isPresenterNode();
                if (this.isWorkerNode) {
                    PersistenceConfiguration persistenceConfiguration = this.managementModeInfo.getPersistenceConfiguration();
                    if (persistenceConfiguration != null) {
                        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(persistenceConfiguration.getThreadPoolSize());
                        long persistenceTimeInterval = persistenceConfiguration.getPersistenceTimeInterval();
                        if (persistenceTimeInterval > 0) {
                            this.persistenceManager = new PersistenceManager(newScheduledThreadPool, persistenceTimeInterval);
                        }
                    }
                    this.receiverEventHandler.startServer(haConfiguration.getEventSyncConfig());
                }
                if (!this.isPresenterNode || this.isWorkerNode) {
                    return;
                }
                this.presenterEventHandler.startServer(haConfiguration.getLocalPresenterConfig());
                return;
            }
            if (this.mode == Mode.SingleNode) {
                PersistenceConfiguration persistenceConfiguration2 = this.managementModeInfo.getPersistenceConfiguration();
                if (persistenceConfiguration2 != null) {
                    ScheduledExecutorService newScheduledThreadPool2 = Executors.newScheduledThreadPool(persistenceConfiguration2.getThreadPoolSize());
                    long persistenceTimeInterval2 = persistenceConfiguration2.getPersistenceTimeInterval();
                    if (persistenceTimeInterval2 > 0) {
                        this.persistenceManager = new PersistenceManager(newScheduledThreadPool2, persistenceTimeInterval2);
                        this.persistenceManager.init();
                        return;
                    }
                    return;
                }
                return;
            }
            if (this.mode == Mode.Distributed) {
                DistributedConfiguration distributedConfiguration = this.managementModeInfo.getDistributedConfiguration();
                this.isManagerNode = distributedConfiguration.isManagerNode();
                this.isWorkerNode = distributedConfiguration.isWorkerNode();
                if (this.isWorkerNode) {
                    this.stormReceiverCoordinator = new StormReceiverCoordinator();
                }
                this.isPresenterNode = distributedConfiguration.isPresenterNode();
                if (this.isPresenterNode) {
                    this.presenterEventHandler.startServer(distributedConfiguration.getLocalPresenterConfig());
                }
            }
        } catch (ManagementConfigurationException e) {
            throw new EventManagementException("Error getting management mode information", e);
        }
    }

    public void init(HazelcastInstance hazelcastInstance) {
        if (this.mode == Mode.HA) {
            HAConfiguration haConfiguration = this.managementModeInfo.getHaConfiguration();
            if (this.isWorkerNode) {
                if (!validateHostName(haConfiguration.getEventSyncConfig().getHostName())) {
                    log.error("Hostname : " + haConfiguration.getEventSyncConfig().getHostName() + " defined for eventSync configuration is invalid. Please add proper IP address of the node. This IP address is used by other nodes in the cluster to communicate.");
                }
                if (!validateHostName(haConfiguration.getManagementConfig().getHostName())) {
                    log.error("Hostname : " + haConfiguration.getManagementConfig().getHostName() + " defined for management configuration is invalid. Please add proper IP address of the node. This IP address is used by other nodes in the cluster to communicate.");
                }
                this.receiverEventHandler.init(ConfigurationConstants.RECEIVERS, haConfiguration.getEventSyncConfig(), haConfiguration.constructEventSyncPublisherConfig(), this.isWorkerNode);
                this.haManager = new HAManager(hazelcastInstance, haConfiguration, this.executorService, this.receiverEventHandler, this.presenterEventHandler);
                this.haManager.init();
                if (this.haEventPublisherTimeSyncMap == null) {
                    this.haEventPublisherTimeSyncMap = EventManagementServiceValueHolder.getHazelcastInstance().getMap(ConfigurationConstants.HA_EVENT_PUBLISHER_TIME_SYNC_MAP);
                }
            }
            if (this.isPresenterNode && !validateHostName(haConfiguration.getLocalPresenterConfig().getHostName())) {
                log.error("Hostname : " + haConfiguration.getLocalPresenterConfig().getHostName() + " defined for presentation purpose is invalid. Please add proper IP address of the node. This IP address is used by other nodes in the cluster to communicate.");
            }
            this.presenterEventHandler.init(ConfigurationConstants.PRESENTERS, haConfiguration.getLocalPresenterConfig(), haConfiguration.constructPresenterPublisherConfig(), this.isPresenterNode && !this.isWorkerNode);
            checkMemberUpdate();
        } else if (this.mode == Mode.Distributed) {
            if (this.stormReceiverCoordinator != null) {
                this.stormReceiverCoordinator.tryBecomeCoordinator();
            }
            DistributedConfiguration distributedConfiguration = this.managementModeInfo.getDistributedConfiguration();
            this.presenterEventHandler.init(ConfigurationConstants.PRESENTERS, distributedConfiguration.getLocalPresenterConfig(), distributedConfiguration.constructPresenterPublisherConfig(), this.isPresenterNode);
            checkMemberUpdate();
        } else if (this.mode == Mode.SingleNode) {
            log.warn("CEP started with clustering enabled, but SingleNode configuration given.");
        }
        hazelcastInstance.getCluster().addMembershipListener(new MembershipListener() { // from class: org.wso2.carbon.event.processor.manager.core.internal.CarbonEventManagementService.1
            public void memberAdded(MembershipEvent membershipEvent) {
                CarbonEventManagementService.this.presenterEventHandler.registerLocalMember();
                CarbonEventManagementService.this.receiverEventHandler.registerLocalMember();
                CarbonEventManagementService.this.checkMemberUpdate();
                if (CarbonEventManagementService.this.mode == Mode.HA && CarbonEventManagementService.this.isWorkerNode && CarbonEventManagementService.this.haManager != null) {
                    CarbonEventManagementService.this.haManager.verifyState();
                }
            }

            public void memberRemoved(MembershipEvent membershipEvent) {
                CarbonEventManagementService.this.receiverEventHandler.removeMember(membershipEvent.getMember().getUuid().toString());
                CarbonEventManagementService.this.presenterEventHandler.removeMember(membershipEvent.getMember().getUuid().toString());
                CarbonEventManagementService.this.checkMemberUpdate();
                if (CarbonEventManagementService.this.mode == Mode.HA) {
                    if (!CarbonEventManagementService.this.isWorkerNode || CarbonEventManagementService.this.haManager == null) {
                        return;
                    }
                    CarbonEventManagementService.this.haManager.tryChangeState();
                    return;
                }
                if (CarbonEventManagementService.this.mode != Mode.Distributed || CarbonEventManagementService.this.stormReceiverCoordinator == null) {
                    return;
                }
                CarbonEventManagementService.this.stormReceiverCoordinator.tryBecomeCoordinator();
            }
        });
    }

    private boolean validateHostName(String str) {
        return (str.trim().equals("0.0.0.0") || str.trim().equals("localhost") || str.trim().equals("127.0.0.1") || str.trim().equals("::1")) ? false : true;
    }

    public void init(ConfigurationContextService configurationContextService) {
        if ((this.mode == Mode.SingleNode || this.isWorkerNode) && this.receiverManager != null) {
            this.receiverManager.start();
        }
        if ((((this.mode == Mode.Distributed || this.mode == Mode.HA) && this.isWorkerNode) || this.mode == Mode.SingleNode) && this.receiverManager != null) {
            this.executorService.schedule(new Runnable() { // from class: org.wso2.carbon.event.processor.manager.core.internal.CarbonEventManagementService.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        CarbonEventManagementService.log.info("Starting polling event receivers");
                        EventReceiverManagementService eventReceiverManagementService = CarbonEventManagementService.this.getEventReceiverManagementService();
                        if (eventReceiverManagementService != null) {
                            eventReceiverManagementService.startPolling();
                        } else {
                            CarbonEventManagementService.log.error("Adapter polling failed as EventReceiverManagementService not available");
                        }
                    } catch (Exception e) {
                        CarbonEventManagementService.log.error("Unexpected error occurred when start polling event adapters", e);
                    }
                }
            }, 40000L, TimeUnit.MILLISECONDS);
        }
        int i = 10000;
        if (this.mode == Mode.Distributed) {
            i = this.managementModeInfo.getDistributedConfiguration().getMemberUpdateCheckInterval();
        } else if (this.mode == Mode.HA) {
            i = this.managementModeInfo.getHaConfiguration().getCheckMemberUpdateInterval();
        }
        if (this.mode == Mode.Distributed || this.mode == Mode.HA) {
            this.executorService.scheduleAtFixedRate(new Runnable() { // from class: org.wso2.carbon.event.processor.manager.core.internal.CarbonEventManagementService.3
                @Override // java.lang.Runnable
                public void run() {
                    CarbonEventManagementService.this.checkMemberUpdate();
                }
            }, i, i, TimeUnit.MILLISECONDS);
        }
    }

    public void shutdown() {
        if (this.haManager != null) {
            this.haManager.shutdown();
        }
        if (this.executorService != null) {
            this.executorService.shutdown();
        }
        if (this.persistenceManager != null) {
            this.persistenceManager.shutdown();
        }
        this.receiverEventHandler.shutdown();
        this.presenterEventHandler.shutdown();
    }

    public byte[] getState() {
        if (this.mode == Mode.HA && this.isWorkerNode) {
            return this.haManager.getState();
        }
        return null;
    }

    @Override // org.wso2.carbon.event.processor.manager.core.EventManagementService
    public ManagementModeInfo getManagementModeInfo() {
        return this.managementModeInfo;
    }

    @Override // org.wso2.carbon.event.processor.manager.core.EventManagementService
    public void subscribe(Manager manager) {
        if (manager.getType() == Manager.ManagerType.Processor) {
            this.processorManager = (EventProcessorManagementService) manager;
        } else if (manager.getType() == Manager.ManagerType.Receiver) {
            this.receiverManager = (EventReceiverManagementService) manager;
        } else if (manager.getType() == Manager.ManagerType.Publisher) {
            this.publisherManager.add((EventPublisherManagementService) manager);
        }
    }

    @Override // org.wso2.carbon.event.processor.manager.core.EventManagementService
    public void unsubscribe(Manager manager) {
        if (manager.getType() == Manager.ManagerType.Processor) {
            this.processorManager = null;
        } else if (manager.getType() == Manager.ManagerType.Receiver) {
            this.receiverManager = null;
        } else if (manager.getType() == Manager.ManagerType.Publisher) {
            this.publisherManager.remove(manager);
        }
    }

    @Override // org.wso2.carbon.event.processor.manager.core.EventManagementService
    public void syncEvent(String str, Manager.ManagerType managerType, Event event) {
        if (managerType == Manager.ManagerType.Receiver) {
            this.receiverEventHandler.syncEvent(str, event);
        } else {
            this.presenterEventHandler.syncEvent(str, event);
        }
    }

    @Override // org.wso2.carbon.event.processor.manager.core.EventManagementService
    public void registerEventSync(EventSync eventSync, Manager.ManagerType managerType) {
        if (managerType == Manager.ManagerType.Receiver) {
            this.receiverEventHandler.registerEventSync(eventSync);
        } else {
            this.presenterEventHandler.registerEventSync(eventSync);
        }
    }

    @Override // org.wso2.carbon.event.processor.manager.core.EventManagementService
    public void unregisterEventSync(String str, Manager.ManagerType managerType) {
        if (managerType == Manager.ManagerType.Receiver) {
            this.receiverEventHandler.unregisterEventSync(str);
        } else {
            this.presenterEventHandler.unregisterEventSync(str);
        }
    }

    public EventProcessorManagementService getEventProcessorManagementService() {
        return this.processorManager;
    }

    public EventReceiverManagementService getEventReceiverManagementService() {
        return this.receiverManager;
    }

    public List<EventPublisherManagementService> getEventPublisherManagementService() {
        return this.publisherManager;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkMemberUpdate() {
        if (this.isWorkerNode) {
            if (this.mode == Mode.HA) {
                this.receiverEventHandler.checkMemberUpdate();
                this.presenterEventHandler.checkMemberUpdate();
            } else if (this.mode == Mode.Distributed) {
                this.presenterEventHandler.checkMemberUpdate();
            }
        }
    }

    @Override // org.wso2.carbon.event.processor.manager.core.EventManagementService
    public void updateLatestEventSentTime(String str, int i, long j) {
        this.haEventPublisherTimeSyncMap.putAsync(i + "-" + str, Long.valueOf(EventManagementServiceValueHolder.getHazelcastInstance().getCluster().getClusterTime()));
    }

    @Override // org.wso2.carbon.event.processor.manager.core.EventManagementService
    public long getLatestEventSentTime(String str, int i) {
        if (this.haEventPublisherTimeSyncMap == null) {
            this.haEventPublisherTimeSyncMap = EventManagementServiceValueHolder.getHazelcastInstance().getMap(ConfigurationConstants.HA_EVENT_PUBLISHER_TIME_SYNC_MAP);
        }
        Long l = (Long) this.haEventPublisherTimeSyncMap.get(i + "-" + str);
        if (l != null) {
            return l.longValue();
        }
        return 0L;
    }

    @Override // org.wso2.carbon.event.processor.manager.core.EventManagementService
    public long getClusterTimeInMillis() {
        if (EventManagementServiceValueHolder.getHazelcastInstance() == null) {
            throw new RuntimeException("No HazelcastInstance found.");
        }
        if (EventManagementServiceValueHolder.getHazelcastInstance().getCluster() == null) {
            throw new RuntimeException("No Cluster was found in the HazelcastInstance.");
        }
        try {
            return EventManagementServiceValueHolder.getHazelcastInstance().getCluster().getClusterTime();
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    public void initPersistence() {
        if (this.persistenceManager != null) {
            this.persistenceManager.init();
        }
    }

    public void stopPersistence() {
        if (this.persistenceManager != null) {
            this.persistenceManager.shutdown();
        }
    }
}
