package org.wso2.carbon.stream.processor.core.ha;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.carbon.cluster.coordinator.commons.MemberEventListener;
import org.wso2.carbon.cluster.coordinator.commons.node.NodeDetail;
import org.wso2.carbon.cluster.coordinator.service.ClusterCoordinator;
import org.wso2.carbon.stream.processor.core.ha.util.HAConstants;
import org.wso2.carbon.stream.processor.core.internal.StreamProcessorDataHolder;
import org.wso2.carbon.stream.processor.core.persistence.PersistenceManager;
import org.wso2.siddhi.core.stream.input.source.SourceHandler;
import org.wso2.siddhi.core.table.record.RecordTableHandler;
import org.wso2.siddhi.core.util.transport.BackoffRetryCounter;

/* loaded from: input_file:org/wso2/carbon/stream/processor/core/ha/HAEventListener.class */
public class HAEventListener extends MemberEventListener {
    private static final Logger log = LoggerFactory.getLogger(HAEventListener.class);
    private BackoffRetryCounter backoffRetryCounter = new BackoffRetryCounter();

    public void memberAdded(NodeDetail nodeDetail) {
        ClusterCoordinator clusterCoordinator = StreamProcessorDataHolder.getClusterCoordinator();
        HAManager hAManager = StreamProcessorDataHolder.getHAManager();
        if (!clusterCoordinator.isLeaderNode()) {
            if (hAManager.getNodeId().equals(nodeDetail.getNodeId())) {
                HashMap hashMap = new HashMap();
                hashMap.put(HAConstants.HOST, hAManager.getDeploymentConfig().eventSyncServerConfigs().getHost());
                hashMap.put("port", Integer.valueOf(hAManager.getDeploymentConfig().eventSyncServerConfigs().getPort()));
                hashMap.put("advertisedHost", hAManager.getDeploymentConfig().eventSyncServerConfigs().getAdvertisedHost());
                hashMap.put("advertisedPort", Integer.valueOf(hAManager.getDeploymentConfig().eventSyncServerConfigs().getAdvertisedPort()));
                clusterCoordinator.setPropertiesMap(hashMap);
                return;
            }
            return;
        }
        if (hAManager.getNodeId().equals(nodeDetail.getNodeId())) {
            return;
        }
        HashMap regsiteredSourceHandlers = StreamProcessorDataHolder.getSourceHandlerManager().getRegsiteredSourceHandlers();
        Map map = null;
        long passiveNodeDetailsWaitTimeOutMillis = hAManager.getDeploymentConfig().getPassiveNodeDetailsWaitTimeOutMillis();
        long j = 0;
        boolean z = false;
        do {
            Iterator it = clusterCoordinator.getAllNodeDetails().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                NodeDetail nodeDetail2 = (NodeDetail) it.next();
                if (!nodeDetail2.getNodeId().equals(hAManager.getNodeId())) {
                    map = nodeDetail2.getPropertiesMap();
                    if (null == map) {
                        if (j == 0) {
                            try {
                                j = System.currentTimeMillis();
                            } catch (InterruptedException e) {
                                log.error("Error occurred while waiting for passive node property map to available. " + e.getMessage(), e);
                            }
                        } else if (System.currentTimeMillis() - j > passiveNodeDetailsWaitTimeOutMillis) {
                            log.error("Wait time out " + passiveNodeDetailsWaitTimeOutMillis + " milliseconds exceeded. Active node could not retrieve passive node details from database");
                            z = true;
                        } else {
                            log.warn("Passive node properties Map is null. Waiting " + hAManager.getDeploymentConfig().getPassiveNodeDetailsRetrySleepTimeMillis() + " milliseconds till it available");
                            Thread.sleep(hAManager.getDeploymentConfig().getPassiveNodeDetailsRetrySleepTimeMillis());
                        }
                    }
                }
            }
            if (z) {
                break;
            }
        } while (map == null);
        if (null != map) {
            hAManager.setPassiveNodeAdded(true);
            hAManager.setPassiveNodeHostPort(getHost(map), getPort(map));
            hAManager.initializeEventSyncConnectionPool();
            Iterator it2 = regsiteredSourceHandlers.values().iterator();
            while (it2.hasNext()) {
                ((HACoordinationSourceHandler) ((SourceHandler) it2.next())).setPassiveNodeAdded(true);
            }
            new PersistenceManager().run();
        }
    }

    public void memberRemoved(NodeDetail nodeDetail) {
        ClusterCoordinator clusterCoordinator = StreamProcessorDataHolder.getClusterCoordinator();
        HAManager hAManager = StreamProcessorDataHolder.getHAManager();
        if (!clusterCoordinator.isLeaderNode() || hAManager.getNodeId().equals(nodeDetail.getNodeId())) {
            return;
        }
        HashMap regsiteredSourceHandlers = StreamProcessorDataHolder.getSourceHandlerManager().getRegsiteredSourceHandlers();
        StreamProcessorDataHolder.getHAManager().setPassiveNodeAdded(false);
        Iterator it = regsiteredSourceHandlers.values().iterator();
        while (it.hasNext()) {
            ((HACoordinationSourceHandler) ((SourceHandler) it.next())).setPassiveNodeAdded(false);
        }
    }

    public void coordinatorChanged(NodeDetail nodeDetail) {
        ClusterCoordinator clusterCoordinator = StreamProcessorDataHolder.getClusterCoordinator();
        if (clusterCoordinator != null) {
            Map registeredSinkHandlers = StreamProcessorDataHolder.getSinkHandlerManager().getRegisteredSinkHandlers();
            HashMap regsiteredSourceHandlers = StreamProcessorDataHolder.getSourceHandlerManager().getRegsiteredSourceHandlers();
            Map registeredRecordTableHandlers = StreamProcessorDataHolder.getRecordTableHandlerManager().getRegisteredRecordTableHandlers();
            synchronized (this) {
                if (clusterCoordinator.isLeaderNode()) {
                    for (SourceHandler sourceHandler : regsiteredSourceHandlers.values()) {
                        try {
                            ((HACoordinationSourceHandler) sourceHandler).setAsActive();
                        } catch (Throwable th) {
                            log.error("HA Deployment: Error when connecting to source " + sourceHandler.getElementId() + " while changing from passive state to active, skipping the source. ", th);
                        }
                    }
                    StreamProcessorDataHolder.getHAManager().changeToActive();
                } else if (StreamProcessorDataHolder.getHAManager().isActiveNode()) {
                    StreamProcessorDataHolder.getHAManager().changeToPassive();
                    Iterator it = registeredSinkHandlers.entrySet().iterator();
                    while (it.hasNext()) {
                        ((HACoordinationSinkHandler) ((Map.Entry) it.next()).getValue()).setAsPassive();
                    }
                    Iterator it2 = regsiteredSourceHandlers.values().iterator();
                    while (it2.hasNext()) {
                        ((HACoordinationSourceHandler) ((SourceHandler) it2.next())).setAsPassive();
                    }
                    Iterator it3 = registeredRecordTableHandlers.values().iterator();
                    while (it3.hasNext()) {
                        ((HACoordinationRecordTableHandler) ((RecordTableHandler) it3.next())).setAsPassive();
                    }
                }
            }
        }
    }

    public void becameUnresponsive(String str) {
        if (StreamProcessorDataHolder.getHAManager().isActiveNode()) {
            Map registeredSinkHandlers = StreamProcessorDataHolder.getSinkHandlerManager().getRegisteredSinkHandlers();
            HashMap regsiteredSourceHandlers = StreamProcessorDataHolder.getSourceHandlerManager().getRegsiteredSourceHandlers();
            Map registeredRecordTableHandlers = StreamProcessorDataHolder.getRecordTableHandlerManager().getRegisteredRecordTableHandlers();
            StreamProcessorDataHolder.getHAManager().changeToPassive();
            Iterator it = registeredSinkHandlers.entrySet().iterator();
            while (it.hasNext()) {
                ((HACoordinationSinkHandler) ((Map.Entry) it.next()).getValue()).setAsPassive();
            }
            Iterator it2 = regsiteredSourceHandlers.values().iterator();
            while (it2.hasNext()) {
                ((HACoordinationSourceHandler) ((SourceHandler) it2.next())).setAsPassive();
            }
            Iterator it3 = registeredRecordTableHandlers.values().iterator();
            while (it3.hasNext()) {
                ((HACoordinationRecordTableHandler) ((RecordTableHandler) it3.next())).setAsPassive();
            }
        }
    }

    private String getHost(Map map) {
        Object obj = map.get("advertisedHost");
        if (obj == null) {
            obj = map.get(HAConstants.HOST);
        }
        return (String) obj;
    }

    private int getPort(Map map) {
        int i = 0;
        try {
            i = ((Integer) map.get("advertisedPort")).intValue();
        } catch (Exception e) {
            log.warn("Error in getting the advertisedPort from deployment yaml. Hence using port as the advertisedPort" + e.getMessage());
        }
        if (i == 0) {
            i = ((Integer) map.get("port")).intValue();
        }
        return i;
    }
}
