package org.wso2.carbon.stream.processor.core.ha;

import com.google.gson.Gson;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.http.cookie.ClientCookie;
import org.apache.log4j.Logger;
import org.wso2.carbon.cluster.coordinator.service.ClusterCoordinator;
import org.wso2.carbon.stream.processor.core.DeploymentMode;
import org.wso2.carbon.stream.processor.core.NodeInfo;
import org.wso2.carbon.stream.processor.core.ha.util.CompressionUtil;
import org.wso2.carbon.stream.processor.core.ha.util.RequestUtil;
import org.wso2.carbon.stream.processor.core.internal.StreamProcessorDataHolder;
import org.wso2.carbon.stream.processor.core.internal.beans.DeploymentConfig;
import org.wso2.carbon.stream.processor.core.model.HAStateSyncObject;
import org.wso2.siddhi.core.SiddhiAppRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.exception.CannotRestoreSiddhiAppStateException;
import org.wso2.siddhi.core.stream.input.source.SourceHandler;

/* loaded from: input_file:org/wso2/carbon/stream/processor/core/ha/HAManager.class */
public class HAManager {
    private ClusterCoordinator clusterCoordinator;
    private ScheduledExecutorService passiveNodeOutputSchedulerService;
    private ScheduledFuture passiveNodeOutputScheduledFuture;
    private boolean liveSyncEnabled;
    private int outputSyncInterval;
    private String localHost;
    private String localPort;
    private Timer syncAfterGracePeriodTimer;
    private int stateSyncGracePeriod;
    private boolean isActiveNode;
    private String nodeId;
    private String clusterId;
    private int sinkQueueCapacity;
    private int sourceQueueCapacity;
    private String username;
    private String password;
    private String activeNodeHost;
    private String activeNodePort;
    private HACoordinationSourceHandlerManager sourceHandlerManager;
    private HACoordinationSinkHandlerManager sinkHandlerManager;
    private HACoordinationRecordTableHandlerManager recordTableHandlerManager;
    private List<Timer> retrySiddhiAppSyncTimerList = new LinkedList();
    private boolean isActiveNodeOutputSyncManagerStarted;
    private static final Map<String, Object> activeNodePropertiesMap = new HashMap();
    private static final Logger log = Logger.getLogger(HAManager.class);

    public HAManager(ClusterCoordinator clusterCoordinator, String str, String str2, DeploymentConfig deploymentConfig) {
        this.clusterCoordinator = clusterCoordinator;
        this.nodeId = str;
        this.clusterId = str2;
        this.localHost = deploymentConfig.getLiveSync().getAdvertisedHost();
        this.localPort = String.valueOf(deploymentConfig.getLiveSync().getAdvertisedPort());
        this.liveSyncEnabled = deploymentConfig.getLiveSync().isEnabled();
        this.outputSyncInterval = deploymentConfig.getOutputSyncInterval();
        this.stateSyncGracePeriod = deploymentConfig.getStateSyncGracePeriod();
        this.sinkQueueCapacity = deploymentConfig.getSinkQueueCapacity();
        this.sourceQueueCapacity = deploymentConfig.getSourceQueueCapacity();
        this.username = deploymentConfig.getLiveSync().getUsername();
        this.password = deploymentConfig.getLiveSync().getPassword();
    }

    public void start() {
        this.sourceHandlerManager = new HACoordinationSourceHandlerManager(this.sourceQueueCapacity);
        this.sinkHandlerManager = new HACoordinationSinkHandlerManager(this.sinkQueueCapacity);
        this.recordTableHandlerManager = new HACoordinationRecordTableHandlerManager(this.sinkQueueCapacity);
        StreamProcessorDataHolder.setSinkHandlerManager(this.sinkHandlerManager);
        StreamProcessorDataHolder.setSourceHandlerManager(this.sourceHandlerManager);
        StreamProcessorDataHolder.setRecordTableHandlerManager(this.recordTableHandlerManager);
        SiddhiManager siddhiManager = StreamProcessorDataHolder.getSiddhiManager();
        siddhiManager.setSourceHandlerManager(StreamProcessorDataHolder.getSourceHandlerManager());
        siddhiManager.setSinkHandlerManager(StreamProcessorDataHolder.getSinkHandlerManager());
        siddhiManager.setRecordTableHandlerManager(StreamProcessorDataHolder.getRecordTableHandlerManager());
        this.clusterCoordinator.registerEventListener(new HAEventListener());
        while (this.clusterCoordinator.getLeaderNode() == null) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                log.warn("Error in waiting for leader node");
            }
        }
        this.isActiveNode = this.clusterCoordinator.isLeaderNode();
        if (this.isActiveNode) {
            log.info("HA Deployment: Starting up as Active Node");
            activeNodePropertiesMap.put("host", this.localHost);
            activeNodePropertiesMap.put(ClientCookie.PORT_ATTR, this.localPort);
            this.clusterCoordinator.setPropertiesMap(activeNodePropertiesMap);
            this.isActiveNode = true;
            if (!this.liveSyncEnabled) {
                Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new ActiveNodeOutputSyncManager(this.sinkHandlerManager, this.recordTableHandlerManager, this.clusterCoordinator), 0L, this.outputSyncInterval, TimeUnit.MILLISECONDS);
                this.isActiveNodeOutputSyncManagerStarted = true;
            }
        } else {
            log.info("HA Deployment: Starting up as Passive Node");
            Map propertiesMap = this.clusterCoordinator.getLeaderNode().getPropertiesMap();
            this.activeNodeHost = (String) propertiesMap.get("host");
            this.activeNodePort = (String) propertiesMap.get(ClientCookie.PORT_ATTR);
            if (this.liveSyncEnabled) {
                log.info("Passive Node: Live Sync enabled. State sync from Active node scheduled after " + (this.stateSyncGracePeriod / 1000) + " seconds");
                this.syncAfterGracePeriodTimer = liveSyncAfterGracePeriod(this.stateSyncGracePeriod);
            } else {
                log.info("Passive Node: Live Sync disabled. State sync from Active node scheduled after " + (this.stateSyncGracePeriod / 1000) + " seconds");
                this.syncAfterGracePeriodTimer = persistenceStoreSyncAfterGracePeriod(this.stateSyncGracePeriod);
            }
            this.passiveNodeOutputSchedulerService = Executors.newSingleThreadScheduledExecutor();
            this.passiveNodeOutputScheduledFuture = this.passiveNodeOutputSchedulerService.scheduleAtFixedRate(new PassiveNodeOutputSyncManager(this.clusterCoordinator, this.sinkHandlerManager, this.recordTableHandlerManager, this.activeNodeHost, this.activeNodePort, this.liveSyncEnabled, this.username, this.password), this.outputSyncInterval, this.outputSyncInterval, TimeUnit.MILLISECONDS);
        }
        NodeInfo nodeInfo = StreamProcessorDataHolder.getNodeInfo();
        nodeInfo.setMode(DeploymentMode.MINIMUM_HA);
        nodeInfo.setNodeId(this.nodeId);
        nodeInfo.setGroupId(this.clusterId);
        nodeInfo.setActiveNode(this.isActiveNode);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void changeToActive() {
        this.isActiveNode = true;
        activeNodePropertiesMap.put("host", this.localHost);
        activeNodePropertiesMap.put(ClientCookie.PORT_ATTR, this.localPort);
        this.clusterCoordinator.setPropertiesMap(activeNodePropertiesMap);
        if (this.passiveNodeOutputScheduledFuture != null) {
            this.passiveNodeOutputScheduledFuture.cancel(false);
        }
        if (this.passiveNodeOutputSchedulerService != null) {
            this.passiveNodeOutputSchedulerService.shutdown();
        }
        if (this.syncAfterGracePeriodTimer != null) {
            this.syncAfterGracePeriodTimer.cancel();
            this.syncAfterGracePeriodTimer.purge();
        }
        if (this.retrySiddhiAppSyncTimerList.size() > 0) {
            for (Timer timer : this.retrySiddhiAppSyncTimerList) {
                timer.cancel();
                timer.purge();
            }
        }
        if (!this.liveSyncEnabled && !this.isActiveNodeOutputSyncManagerStarted) {
            Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new ActiveNodeOutputSyncManager(this.sinkHandlerManager, this.recordTableHandlerManager, this.clusterCoordinator), 0L, this.outputSyncInterval, TimeUnit.MILLISECONDS);
            this.isActiveNodeOutputSyncManagerStarted = true;
        }
        StreamProcessorDataHolder.getNodeInfo().setActiveNode(this.isActiveNode);
    }

    private Timer liveSyncAfterGracePeriod(final int i) {
        Timer timer = new Timer();
        timer.schedule(new TimerTask() { // from class: org.wso2.carbon.stream.processor.core.ha.HAManager.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                HAManager.log.info("Passive Node: Borrowing state from active node after " + (i / 1000) + " seconds");
                for (SourceHandler sourceHandler : HAManager.this.sourceHandlerManager.getRegsiteredSourceHandlers().values()) {
                    if (HAManager.log.isDebugEnabled()) {
                        HAManager.log.debug("Setting source handler with ID " + sourceHandler.getElementId() + " to collect events in buffer");
                    }
                    ((HACoordinationSourceHandler) sourceHandler).collectEvents(true);
                }
                HAStateSyncObject activeNodeSnapshot = HAManager.this.getActiveNodeSnapshot(HAManager.this.activeNodeHost, HAManager.this.activeNodePort);
                if (activeNodeSnapshot == null || !activeNodeSnapshot.hasState()) {
                    return;
                }
                Map<String, byte[]> snapshotMap = activeNodeSnapshot.getSnapshotMap();
                HashMap hashMap = new HashMap();
                for (Map.Entry<String, byte[]> entry : snapshotMap.entrySet()) {
                    try {
                        hashMap.put(entry.getKey(), CompressionUtil.decompressGZIP(entry.getValue()));
                    } catch (IOException e) {
                        HAManager.log.error("Passive Node: Error decompressing bytes of active nodes state. " + e.getMessage(), e);
                    }
                }
                for (SiddhiAppRuntime siddhiAppRuntime : StreamProcessorDataHolder.getSiddhiManager().getSiddhiAppRuntimeMap().values()) {
                    byte[] bArr = (byte[]) hashMap.get(siddhiAppRuntime.getName());
                    if (bArr != null) {
                        if (HAManager.log.isDebugEnabled()) {
                            HAManager.log.debug("Passive Node: Restoring state of Siddhi Application " + siddhiAppRuntime.getName() + " of passive node while live syncing after specified grace period");
                        }
                        try {
                            siddhiAppRuntime.restore(bArr);
                        } catch (CannotRestoreSiddhiAppStateException e2) {
                            HAManager.log.error("Error in restoring Siddhi app " + siddhiAppRuntime.getName(), e2);
                        }
                        StreamProcessorDataHolder.getNodeInfo().setLastSyncedTimestamp(System.currentTimeMillis());
                        StreamProcessorDataHolder.getNodeInfo().setInSync(true);
                    } else {
                        HAManager.log.warn("Passive Node: No Snapshot found for Siddhi Application " + siddhiAppRuntime.getName() + " while trying live sync with active node after specified grace period");
                    }
                }
            }
        }, i);
        return timer;
    }

    private Timer persistenceStoreSyncAfterGracePeriod(int i) {
        Timer timer = new Timer();
        timer.schedule(new TimerTask() { // from class: org.wso2.carbon.stream.processor.core.ha.HAManager.2
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                StreamProcessorDataHolder.getSiddhiManager().restoreLastState();
                StreamProcessorDataHolder.getNodeInfo().setLastSyncedTimestamp(System.currentTimeMillis());
            }
        }, i);
        return timer;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public HAStateSyncObject getActiveNodeSnapshot(String str, String str2) {
        return (HAStateSyncObject) new Gson().fromJson(RequestUtil.sendRequest(URI.create(String.format("http://%s:%d/ha/state", str, Integer.valueOf(Integer.parseInt(str2)))), this.username, this.password), HAStateSyncObject.class);
    }

    public HAStateSyncObject getActiveNodeSiddhiAppSnapshot(String str) {
        if (this.clusterCoordinator.isLeaderNode()) {
            log.error("Illegal getActiveNodeSiddhiAppSnapshot Called from Active Node");
            return new HAStateSyncObject(false);
        }
        Map propertiesMap = this.clusterCoordinator.getLeaderNode().getPropertiesMap();
        if (propertiesMap == null) {
            log.error("Leader Node Host and Port is Not Set!");
            return new HAStateSyncObject(false);
        }
        this.activeNodeHost = (String) propertiesMap.get("host");
        this.activeNodePort = (String) propertiesMap.get(ClientCookie.PORT_ATTR);
        HAStateSyncObject hAStateSyncObject = (HAStateSyncObject) new Gson().fromJson(RequestUtil.sendRequest(URI.create(String.format("http://%s:%d/ha/state/" + str, this.activeNodeHost, Integer.valueOf(Integer.parseInt(this.activeNodePort)))), this.username, this.password), HAStateSyncObject.class);
        return hAStateSyncObject != null ? hAStateSyncObject : new HAStateSyncObject(false);
    }

    public boolean isActiveNode() {
        return this.isActiveNode;
    }

    public boolean isLiveStateSyncEnabled() {
        return this.liveSyncEnabled;
    }

    public void addRetrySiddhiAppSyncTimer(Timer timer) {
        this.retrySiddhiAppSyncTimerList.add(timer);
    }

    public static Map<String, Object> getActiveNodePropertiesMap() {
        return activeNodePropertiesMap;
    }
}
