package org.wso2.carbon.event.processor.manager.core.internal;

import com.hazelcast.core.EntryAdapter;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.cp.lock.FencedLock;
import com.hazelcast.map.IMap;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException;
import org.wso2.carbon.event.processor.manager.commons.utils.ByteSerializer;
import org.wso2.carbon.event.processor.manager.commons.utils.HostAndPort;
import org.wso2.carbon.event.processor.manager.core.EventProcessorManagementService;
import org.wso2.carbon.event.processor.manager.core.EventPublisherManagementService;
import org.wso2.carbon.event.processor.manager.core.EventReceiverManagementService;
import org.wso2.carbon.event.processor.manager.core.Manager;
import org.wso2.carbon.event.processor.manager.core.config.HAConfiguration;
import org.wso2.carbon.event.processor.manager.core.config.PersistenceConfiguration;
import org.wso2.carbon.event.processor.manager.core.exception.EventManagementException;
import org.wso2.carbon.event.processor.manager.core.internal.ds.EventManagementServiceValueHolder;
import org.wso2.carbon.event.processor.manager.core.internal.thrift.ManagementServiceClientThriftImpl;
import org.wso2.carbon.event.processor.manager.core.internal.thrift.ManagementServiceImpl;
import org.wso2.carbon.event.processor.manager.core.internal.thrift.service.ManagementService;
import org.wso2.carbon.event.processor.manager.core.internal.util.ConfigurationConstants;

/* loaded from: input_file:org/wso2/carbon/event/processor/manager/core/internal/HAManager.class */
public class HAManager {
    private static final Log log = LogFactory.getLog(HAManager.class);
    private final ScheduledExecutorService executorService;
    private final EventHandler receiverEventHandler;
    private final EventHandler presenterEventHandler;
    private final SnapshotServer snapshotServer;
    private HAConfiguration haConfiguration;
    private boolean activeLockAcquired;
    private boolean passiveLockAcquired;
    private boolean isBackup;
    private boolean synced;
    private FencedLock activeLock;
    private FencedLock passiveLock;
    private IMap<String, HAConfiguration> roleToMembershipMap;
    private Future stateChanger = null;
    private String activeId = ConfigurationConstants.ACTIVEID;
    private String passiveId = ConfigurationConstants.PASSIVEID;
    private HAConfiguration otherMember;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/wso2/carbon/event/processor/manager/core/internal/HAManager$PeriodicStateChanger.class */
    public class PeriodicStateChanger implements Runnable {
        PeriodicStateChanger() {
        }

        @Override // java.lang.Runnable
        public void run() {
            HAManager.this.tryChangeState();
            if (HAManager.this.activeLockAcquired) {
                return;
            }
            HAManager.this.stateChanger = HAManager.this.executorService.schedule(this, HAManager.this.haConfiguration.getManagementTryStateChangeInterval(), TimeUnit.MILLISECONDS);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/wso2/carbon/event/processor/manager/core/internal/HAManager$SnapshotServer.class */
    public class SnapshotServer {
        private TServer dataReceiverServer;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/wso2/carbon/event/processor/manager/core/internal/HAManager$SnapshotServer$ServerThread.class */
        public class ServerThread implements Runnable {
            private TServer server;

            ServerThread(TServer tServer) {
                this.server = tServer;
            }

            @Override // java.lang.Runnable
            public void run() {
                this.server.serve();
            }
        }

        SnapshotServer() {
        }

        public void start(HAConfiguration hAConfiguration) {
            HostAndPort managementConfig = hAConfiguration.getManagementConfig();
            try {
                this.dataReceiverServer = new TThreadPoolServer(new TThreadPoolServer.Args(new TServerSocket(new InetSocketAddress(managementConfig.getHostName(), managementConfig.getPort()))).processor(new ManagementService.Processor(new ManagementServiceImpl())));
                Thread thread = new Thread(new ServerThread(this.dataReceiverServer));
                HAManager.log.info("CEP HA Snapshot Server started on " + managementConfig.getHostName() + ":" + managementConfig.getPort());
                thread.start();
            } catch (TTransportException e) {
                HAManager.log.error("Cannot start CEP HA Snapshot Server on port " + managementConfig.getHostName() + ":" + managementConfig.getPort(), e);
            } catch (Throwable th) {
                HAManager.log.error("Error in starting CEP HA Snapshot Server ", th);
            }
        }

        public void shutDown() {
            this.dataReceiverServer.stop();
        }
    }

    public HAManager(HazelcastInstance hazelcastInstance, HAConfiguration hAConfiguration, ScheduledExecutorService scheduledExecutorService, EventHandler eventHandler, EventHandler eventHandler2) {
        this.haConfiguration = hAConfiguration;
        this.executorService = scheduledExecutorService;
        this.receiverEventHandler = eventHandler;
        this.presenterEventHandler = eventHandler2;
        this.activeLock = hazelcastInstance.getCPSubsystem().getLock(this.activeId);
        this.passiveLock = hazelcastInstance.getCPSubsystem().getLock(this.passiveId);
        this.haConfiguration.setMemberUuid(hazelcastInstance.getCluster().getLocalMember().getUuid().toString());
        eventHandler2.allowEventSync(false);
        this.snapshotServer = new SnapshotServer();
        this.snapshotServer.start(hAConfiguration);
        this.roleToMembershipMap = hazelcastInstance.getMap(ConfigurationConstants.ROLE_MEMBERSHIP_MAP);
        this.roleToMembershipMap.addEntryListener(new EntryAdapter<String, HAConfiguration>() { // from class: org.wso2.carbon.event.processor.manager.core.internal.HAManager.1
            public void entryRemoved(EntryEvent<String, HAConfiguration> entryEvent) {
                HAManager.this.tryChangeState();
            }
        }, this.activeId, false);
    }

    public void init() {
        tryChangeState();
        if (this.activeLockAcquired) {
            return;
        }
        this.executorService.execute(new PeriodicStateChanger());
    }

    public void tryChangeState() {
        if (this.activeLockAcquired || this.passiveLockAcquired || this.isBackup) {
            if (this.activeLockAcquired || this.passiveLockAcquired) {
                if (!this.activeLockAcquired && !this.isBackup && this.activeLock.tryLock()) {
                    this.activeLockAcquired = true;
                    becomeActive();
                    this.passiveLockAcquired = false;
                    this.passiveLock.unlock();
                }
            } else if (this.passiveLock.tryLock()) {
                this.passiveLockAcquired = true;
                if (this.activeLock.tryLock()) {
                    this.activeLockAcquired = true;
                    becomeActive();
                    this.passiveLockAcquired = false;
                    this.passiveLock.unlock();
                } else {
                    becomePassive();
                    this.isBackup = false;
                }
            }
        } else if (this.passiveLock.tryLock()) {
            this.passiveLockAcquired = true;
            if (this.activeLock.tryLock()) {
                this.activeLockAcquired = true;
                becomeActive();
                this.passiveLockAcquired = false;
                this.passiveLock.unlock();
            } else {
                becomePassive();
                this.isBackup = false;
            }
        } else {
            becomeBackup();
            this.isBackup = true;
        }
        this.haConfiguration.setActive(this.activeLockAcquired);
    }

    public void verifyState() {
        if (this.activeLockAcquired && !((HAConfiguration) this.roleToMembershipMap.get(this.activeId)).getMemberUuid().equalsIgnoreCase(this.haConfiguration.getMemberUuid())) {
            if (this.passiveLock.tryLock()) {
                this.passiveLockAcquired = true;
                this.activeLockAcquired = false;
                becomePassive();
                this.isBackup = false;
                this.executorService.execute(new PeriodicStateChanger());
            } else {
                this.passiveLockAcquired = false;
                this.activeLockAcquired = false;
                becomeBackup();
                this.isBackup = true;
                this.executorService.execute(new PeriodicStateChanger());
            }
        }
        this.haConfiguration.setActive(this.activeLockAcquired);
    }

    public byte[] getState() {
        CarbonEventManagementService carbonEventManagementService = EventManagementServiceValueHolder.getCarbonEventManagementService();
        EventReceiverManagementService eventReceiverManagementService = carbonEventManagementService.getEventReceiverManagementService();
        EventProcessorManagementService eventProcessorManagementService = carbonEventManagementService.getEventProcessorManagementService();
        HAConfiguration hAConfiguration = (HAConfiguration) this.roleToMembershipMap.get(this.passiveId);
        this.otherMember = hAConfiguration;
        HashMap hashMap = new HashMap();
        this.receiverEventHandler.addEventPublisher(hAConfiguration.getEventSyncConfig());
        if (eventProcessorManagementService != null) {
            eventProcessorManagementService.pause();
        }
        if (eventReceiverManagementService != null) {
            eventReceiverManagementService.pause();
            hashMap.put(Manager.ManagerType.Receiver, eventReceiverManagementService.getState());
        }
        if (eventProcessorManagementService != null) {
            hashMap.put(Manager.ManagerType.Processor, eventProcessorManagementService.getState());
        }
        byte[] OToB = ByteSerializer.OToB(hashMap);
        if (eventProcessorManagementService != null) {
            eventProcessorManagementService.resume();
        }
        if (eventReceiverManagementService != null) {
            eventReceiverManagementService.resume();
        }
        return OToB;
    }

    public void shutdown() {
        CarbonEventManagementService carbonEventManagementService = EventManagementServiceValueHolder.getCarbonEventManagementService();
        if (this.passiveLockAcquired) {
            this.roleToMembershipMap.remove(this.passiveId);
            this.passiveLock.unlock();
        }
        if (this.activeLockAcquired) {
            this.roleToMembershipMap.remove(this.activeId);
            this.activeLock.unlock();
        }
        this.stateChanger.cancel(false);
        if (this.snapshotServer != null) {
            this.snapshotServer.shutDown();
        }
        carbonEventManagementService.stopPersistence();
    }

    private void becomeActive() {
        CarbonEventManagementService carbonEventManagementService = EventManagementServiceValueHolder.getCarbonEventManagementService();
        EventReceiverManagementService eventReceiverManagementService = carbonEventManagementService.getEventReceiverManagementService();
        List<EventPublisherManagementService> eventPublisherManagementService = carbonEventManagementService.getEventPublisherManagementService();
        EventProcessorManagementService eventProcessorManagementService = carbonEventManagementService.getEventProcessorManagementService();
        this.roleToMembershipMap.set(this.activeId, this.haConfiguration);
        this.otherMember = null;
        PersistenceConfiguration persistenceConfiguration = carbonEventManagementService.getManagementModeInfo().getPersistenceConfiguration();
        if (!this.synced && persistenceConfiguration != null && persistenceConfiguration.isPersistenceEnabled()) {
            eventProcessorManagementService.restoreLastState();
            log.info("Restored to Last Known State.");
        }
        if (eventPublisherManagementService != null) {
            Iterator<EventPublisherManagementService> it = eventPublisherManagementService.iterator();
            while (it.hasNext()) {
                it.next().setDrop(false);
            }
        }
        if (eventReceiverManagementService != null) {
            eventReceiverManagementService.start();
        }
        this.receiverEventHandler.allowContinueProcess(true);
        this.presenterEventHandler.allowEventSync(true);
        carbonEventManagementService.initPersistence();
        log.info("Became CEP HA Active Member");
    }

    private void becomePassive() {
        try {
            this.roleToMembershipMap.set(this.passiveId, this.haConfiguration);
            final HAConfiguration hAConfiguration = (HAConfiguration) this.roleToMembershipMap.get(this.activeId);
            this.otherMember = hAConfiguration;
            final CarbonEventManagementService carbonEventManagementService = EventManagementServiceValueHolder.getCarbonEventManagementService();
            this.receiverEventHandler.addEventPublisher(hAConfiguration.getEventSyncConfig());
            this.receiverEventHandler.allowContinueProcess(true);
            this.presenterEventHandler.allowEventSync(false);
            this.executorService.execute(new Runnable() { // from class: org.wso2.carbon.event.processor.manager.core.internal.HAManager.2
                @Override // java.lang.Runnable
                public void run() {
                    while (true) {
                        try {
                            HAManager.log.info("CEP HA State syncing started..");
                            HAManager.this.syncState(hAConfiguration, carbonEventManagementService);
                            HAManager.log.info("CEP HA State successfully synced.");
                            return;
                        } catch (EventManagementException e) {
                            HAManager.log.error("CEP HA State syncing failed, " + e.getMessage(), e);
                            try {
                                Thread.sleep(HAManager.this.haConfiguration.getManagementStateSyncRetryInterval());
                            } catch (InterruptedException e2) {
                            }
                        }
                    }
                }
            });
            log.info("Became CEP HA Passive Member");
        } catch (Exception e) {
            log.error("Error while setting the member as passive member. ", e);
        }
    }

    private void becomeBackup() {
        EventManagementServiceValueHolder.getCarbonEventManagementService();
        this.receiverEventHandler.addEventPublisher(((HAConfiguration) this.roleToMembershipMap.get(this.passiveId)).getEventSyncConfig());
        this.receiverEventHandler.addEventPublisher(((HAConfiguration) this.roleToMembershipMap.get(this.passiveId)).getEventSyncConfig());
        this.presenterEventHandler.allowEventSync(false);
        log.info("Became CEP HA Backup Member");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void syncState(HAConfiguration hAConfiguration, CarbonEventManagementService carbonEventManagementService) {
        EventReceiverManagementService eventReceiverManagementService = carbonEventManagementService.getEventReceiverManagementService();
        EventProcessorManagementService eventProcessorManagementService = carbonEventManagementService.getEventProcessorManagementService();
        List<EventPublisherManagementService> eventPublisherManagementService = carbonEventManagementService.getEventPublisherManagementService();
        if (eventReceiverManagementService != null) {
            eventReceiverManagementService.start();
            eventReceiverManagementService.pause();
        }
        if (eventProcessorManagementService != null) {
            eventProcessorManagementService.pause();
        }
        if (eventPublisherManagementService != null) {
            Iterator<EventPublisherManagementService> it = eventPublisherManagementService.iterator();
            while (it.hasNext()) {
                it.next().setDrop(true);
            }
        }
        byte[] bArr = null;
        try {
            bArr = new ManagementServiceClientThriftImpl().getSnapshot(hAConfiguration.getManagementConfig());
        } catch (Throwable th) {
            log.error(th);
        }
        HashMap hashMap = (HashMap) ByteSerializer.BToO(bArr);
        if (eventProcessorManagementService != null) {
            try {
                eventProcessorManagementService.restoreState((byte[]) hashMap.get(Manager.ManagerType.Processor));
            } catch (Throwable th2) {
                if (eventProcessorManagementService != null) {
                    eventProcessorManagementService.resume();
                }
                if (eventReceiverManagementService != null) {
                    eventReceiverManagementService.resume();
                }
                throw th2;
            }
        }
        if (eventReceiverManagementService != null) {
            eventReceiverManagementService.syncState((byte[]) hashMap.get(Manager.ManagerType.Receiver));
        }
        this.synced = true;
        if (eventProcessorManagementService != null) {
            eventProcessorManagementService.resume();
        }
        if (eventReceiverManagementService != null) {
            eventReceiverManagementService.resume();
        }
    }
}
