package org.wso2.carbon.stream.processor.core.ha;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.wso2.carbon.cluster.coordinator.commons.node.NodeDetail;
import org.wso2.carbon.cluster.coordinator.service.ClusterCoordinator;
import org.wso2.carbon.databridge.commons.ServerEventListener;
import org.wso2.carbon.stream.processor.common.HAStateChangeListener;
import org.wso2.carbon.stream.processor.core.DeploymentMode;
import org.wso2.carbon.stream.processor.core.NodeInfo;
import org.wso2.carbon.stream.processor.core.event.queue.EventListMapManager;
import org.wso2.carbon.stream.processor.core.ha.tcp.TCPServer;
import org.wso2.carbon.stream.processor.core.ha.transport.EventSyncConnectionPoolManager;
import org.wso2.carbon.stream.processor.core.ha.util.HAConstants;
import org.wso2.carbon.stream.processor.core.internal.StreamProcessorDataHolder;
import org.wso2.carbon.stream.processor.core.internal.beans.DeploymentConfig;
import org.wso2.carbon.stream.processor.core.internal.beans.EventSyncClientPoolConfig;
import org.wso2.carbon.stream.processor.core.persistence.PersistenceManager;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.exception.CannotRestoreSiddhiAppStateException;
import org.wso2.siddhi.core.stream.input.source.SourceHandler;
import org.wso2.siddhi.core.stream.output.sink.SinkHandler;
import org.wso2.siddhi.core.table.record.RecordTableHandler;
import org.wso2.siddhi.core.util.transport.BackoffRetryCounter;

/* loaded from: input_file:org/wso2/carbon/stream/processor/core/ha/HAManager.class */
public class HAManager {
    private ClusterCoordinator clusterCoordinator;
    private boolean isActiveNode;
    private String nodeId;
    private String clusterId;
    private HACoordinationSourceHandlerManager sourceHandlerManager;
    private HACoordinationSinkHandlerManager sinkHandlerManager;
    private HACoordinationRecordTableHandlerManager recordTableHandlerManager;
    private DeploymentConfig deploymentConfig;
    private EventSyncClientPoolConfig eventSyncClientPoolConfig;
    private boolean passiveNodeAdded;
    private String host;
    private int port;
    private static final Map<String, Object> passiveNodeDetailsPropertiesMap = new HashMap();
    private static final Logger log = Logger.getLogger(HAManager.class);
    private TCPServer tcpServerInstance = TCPServer.getInstance();
    private BackoffRetryCounter backoffRetryCounter = new BackoffRetryCounter();
    private EventListMapManager eventListMapManager = new EventListMapManager();

    public HAManager(ClusterCoordinator clusterCoordinator, String str, String str2, DeploymentConfig deploymentConfig) {
        this.clusterCoordinator = clusterCoordinator;
        this.nodeId = str;
        this.clusterId = str2;
        this.deploymentConfig = deploymentConfig;
        this.eventSyncClientPoolConfig = deploymentConfig.getTcpClientPoolConfig();
    }

    public void start() {
        this.sourceHandlerManager = new HACoordinationSourceHandlerManager();
        this.sinkHandlerManager = new HACoordinationSinkHandlerManager();
        this.recordTableHandlerManager = new HACoordinationRecordTableHandlerManager();
        StreamProcessorDataHolder.setSinkHandlerManager(this.sinkHandlerManager);
        StreamProcessorDataHolder.setSourceHandlerManager(this.sourceHandlerManager);
        StreamProcessorDataHolder.setRecordTableHandlerManager(this.recordTableHandlerManager);
        SiddhiManager siddhiManager = StreamProcessorDataHolder.getSiddhiManager();
        siddhiManager.setSourceHandlerManager(StreamProcessorDataHolder.getSourceHandlerManager());
        siddhiManager.setSinkHandlerManager(StreamProcessorDataHolder.getSinkHandlerManager());
        siddhiManager.setRecordTableHandlerManager(StreamProcessorDataHolder.getRecordTableHandlerManager());
        this.clusterCoordinator.registerEventListener(new HAEventListener());
        while (this.clusterCoordinator.getLeaderNode() == null) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                log.warn("Error in waiting for leader node");
            }
        }
        this.isActiveNode = this.clusterCoordinator.isLeaderNode();
        if (this.isActiveNode) {
            log.info("HA Deployment: Starting up as Active Node");
            Iterator<HAStateChangeListener> it = StreamProcessorDataHolder.getHaStateChangeListenerList().iterator();
            while (it.hasNext()) {
                it.next().becameActive();
            }
            Map map = null;
            for (NodeDetail nodeDetail : this.clusterCoordinator.getAllNodeDetails()) {
                if (!nodeDetail.getNodeId().equals(this.nodeId)) {
                    map = nodeDetail.getPropertiesMap();
                }
            }
            if (null != map) {
                setPassiveNodeAdded(true);
                setPassiveNodeHostPort(getHost(map), getPort(map));
                initializeEventSyncConnectionPool();
                Iterator it2 = this.sourceHandlerManager.getRegsiteredSourceHandlers().values().iterator();
                while (it2.hasNext()) {
                    ((HACoordinationSourceHandler) ((SourceHandler) it2.next())).setPassiveNodeAdded(true);
                }
                new PersistenceManager().run();
            }
        } else {
            log.info("HA Deployment: Starting up as Passive Node");
            passiveNodeDetailsPropertiesMap.put(HAConstants.HOST, this.deploymentConfig.eventSyncServerConfigs().getHost());
            passiveNodeDetailsPropertiesMap.put("port", Integer.valueOf(this.deploymentConfig.eventSyncServerConfigs().getPort()));
            passiveNodeDetailsPropertiesMap.put("advertisedHost", this.deploymentConfig.eventSyncServerConfigs().getAdvertisedHost());
            passiveNodeDetailsPropertiesMap.put("advertisedPort", Integer.valueOf(this.deploymentConfig.eventSyncServerConfigs().getAdvertisedPort()));
            this.clusterCoordinator.setPropertiesMap(passiveNodeDetailsPropertiesMap);
            EventListMapManager.initializeEventListMap();
            this.tcpServerInstance.start(this.deploymentConfig);
            Iterator<HAStateChangeListener> it3 = StreamProcessorDataHolder.getHaStateChangeListenerList().iterator();
            while (it3.hasNext()) {
                it3.next().becamePassive();
            }
        }
        NodeInfo nodeInfo = StreamProcessorDataHolder.getNodeInfo();
        nodeInfo.setMode(DeploymentMode.MINIMUM_HA);
        nodeInfo.setNodeId(this.nodeId);
        nodeInfo.setGroupId(this.clusterId);
        nodeInfo.setActiveNode(this.isActiveNode);
        if (StreamProcessorDataHolder.isStatisticsEnabled()) {
            StreamProcessorDataHolder.getStatisticsManager().startReporting();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void changeToActive() {
        if (this.isActiveNode) {
            return;
        }
        log.info("HA Deployment: This Node is now becoming the Active Node");
        this.isActiveNode = true;
        changeSiddhiAppState(true);
        StreamProcessorDataHolder.getNodeInfo().setActiveNode(this.isActiveNode);
        this.tcpServerInstance.stop();
        syncState();
        if (null != this.tcpServerInstance.getEventSyncServer().getEventByteBufferQueue()) {
            while (this.tcpServerInstance.getEventSyncServer().getEventByteBufferQueue().peek() != null) {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                    log.warn("Error in checking byte buffer queue empty");
                }
            }
        }
        this.tcpServerInstance.clearResources();
        enableEventTimeClock(true);
        startSiddhiAppRuntimeWithoutSources();
        try {
            this.eventListMapManager.trimAndSendToInputHandler();
        } catch (InterruptedException e2) {
            log.warn("Error in sending events to input handler." + e2.getMessage());
        }
        enableEventTimeClock(false);
        for (SinkHandler sinkHandler : this.sinkHandlerManager.getRegisteredSinkHandlers().values()) {
            try {
                ((HACoordinationSinkHandler) sinkHandler).setAsActive();
            } catch (Throwable th) {
                log.error("HA Deployment: Error when connecting to sink " + sinkHandler.getElementId() + " while changing from passive state to active, skipping the sink. ", th);
            }
        }
        for (RecordTableHandler recordTableHandler : this.recordTableHandlerManager.getRegisteredRecordTableHandlers().values()) {
            try {
                ((HACoordinationRecordTableHandler) recordTableHandler).setAsActive();
            } catch (Throwable th2) {
                this.backoffRetryCounter.reset();
                log.error("HA Deployment: Error in connecting to table " + ((HACoordinationRecordTableHandler) recordTableHandler).getTableId() + " while changing from passive state to active, will retry in " + this.backoffRetryCounter.getTimeInterval(), th2);
                ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
                this.backoffRetryCounter.increment();
                newSingleThreadScheduledExecutor.schedule(new RetryRecordTableConnection(this.backoffRetryCounter, recordTableHandler, newSingleThreadScheduledExecutor), this.backoffRetryCounter.getTimeIntervalMillis(), TimeUnit.MILLISECONDS);
            }
        }
        startSiddhiAppRuntimeSources();
        Iterator<ServerEventListener> it = StreamProcessorDataHolder.getServerListeners().iterator();
        while (it.hasNext()) {
            it.next().start();
        }
        Iterator<HAStateChangeListener> it2 = StreamProcessorDataHolder.getHaStateChangeListenerList().iterator();
        while (it2.hasNext()) {
            it2.next().becameActive();
        }
        log.info("Successfully Changed to Active Mode ");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void changeToPassive() {
        log.info("HA Deployment: This Node is now becoming the Passive Node");
        Iterator<ServerEventListener> it = StreamProcessorDataHolder.getServerListeners().iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
        stopSiddhiAppRuntimes();
        this.isActiveNode = false;
        changeSiddhiAppState(false);
        EventListMapManager.initializeEventListMap();
        StreamProcessorDataHolder.getNodeInfo().setActiveNode(this.isActiveNode);
        this.tcpServerInstance.start(this.deploymentConfig);
        Iterator<HAStateChangeListener> it2 = StreamProcessorDataHolder.getHaStateChangeListenerList().iterator();
        while (it2.hasNext()) {
            it2.next().becamePassive();
        }
        log.info("Successfully Changed to Passive Mode ");
    }

    private void syncState() {
        StreamProcessorDataHolder.getSiddhiManager().getSiddhiAppRuntimeMap().forEach((str, siddhiAppRuntime) -> {
            if (log.isDebugEnabled()) {
                log.debug("Restoring state of Siddhi Application " + siddhiAppRuntime.getName());
            }
            try {
                siddhiAppRuntime.restoreLastRevision();
                StreamProcessorDataHolder.getNodeInfo().setLastSyncedTimestamp(System.currentTimeMillis());
                StreamProcessorDataHolder.getNodeInfo().setInSync(true);
            } catch (CannotRestoreSiddhiAppStateException e) {
                log.error("Error in restoring Siddhi Application: " + siddhiAppRuntime.getName(), e);
            }
        });
        if (log.isDebugEnabled()) {
            log.debug("Successfully Synced the state ");
        }
    }

    private void enableEventTimeClock(boolean z) {
        StreamProcessorDataHolder.getSiddhiManager().getSiddhiAppRuntimeMap().forEach((str, siddhiAppRuntime) -> {
            if (log.isDebugEnabled()) {
                log.debug("Changed Event Play back mode '" + z + "' for Siddhi Application " + siddhiAppRuntime.getName());
            }
            siddhiAppRuntime.enablePlayBack(z, (Long) null, (Long) null);
        });
    }

    private void startSiddhiAppRuntimeWithoutSources() {
        StreamProcessorDataHolder.getSiddhiManager().getSiddhiAppRuntimeMap().forEach((str, siddhiAppRuntime) -> {
            if (log.isDebugEnabled()) {
                log.debug("Starting without sources of Siddhi Application " + siddhiAppRuntime.getName());
            }
            siddhiAppRuntime.startWithoutSources();
        });
    }

    private void startSiddhiAppRuntimeSources() {
        StreamProcessorDataHolder.getSiddhiManager().getSiddhiAppRuntimeMap().forEach((str, siddhiAppRuntime) -> {
            if (log.isDebugEnabled()) {
                log.debug("Starting sources of Siddhi Application " + siddhiAppRuntime.getName());
            }
            siddhiAppRuntime.startSources();
        });
    }

    private void stopSiddhiAppRuntimes() {
        StreamProcessorDataHolder.getSiddhiManager().getSiddhiAppRuntimeMap().forEach((str, siddhiAppRuntime) -> {
            if (log.isDebugEnabled()) {
                log.debug("Stopping Siddhi Application " + siddhiAppRuntime.getName());
            }
            siddhiAppRuntime.shutdown();
        });
    }

    private void changeSiddhiAppState(boolean z) {
        StreamProcessorDataHolder.getStreamProcessorService().getSiddhiAppMap().forEach((str, siddhiAppData) -> {
            if (log.isDebugEnabled()) {
                log.debug("Changed Siddhi Application " + str + " state to " + z);
            }
            siddhiAppData.setActive(z);
        });
    }

    private String getHost(Map map) {
        Object obj = map.get("advertisedHost");
        if (obj == null) {
            obj = map.get(HAConstants.HOST);
        }
        return (String) obj;
    }

    private int getPort(Map map) {
        int i = 0;
        try {
            i = ((Integer) map.get("advertisedPort")).intValue();
        } catch (Exception e) {
            log.warn("Error in getting the advertisedPort from deployment yaml. Hence using port as the advertisedPort" + e.getMessage());
        }
        if (i == 0) {
            i = ((Integer) map.get("port")).intValue();
        }
        return i;
    }

    public void initializeEventSyncConnectionPool() {
        EventSyncConnectionPoolManager.initializeConnectionPool(this.host, this.port, this.deploymentConfig);
    }

    public void setPassiveNodeHostPort(String str, int i) {
        this.host = str;
        this.port = i;
    }

    public boolean isActiveNode() {
        return this.isActiveNode;
    }

    public boolean isPassiveNodeAdded() {
        return this.passiveNodeAdded;
    }

    public void setPassiveNodeAdded(boolean z) {
        this.passiveNodeAdded = z;
    }

    public String getNodeId() {
        return this.nodeId;
    }

    public DeploymentConfig getDeploymentConfig() {
        return this.deploymentConfig;
    }
}
