package org.wso2.carbon.stream.processor.core.ha;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.carbon.cluster.coordinator.commons.MemberEventListener;
import org.wso2.carbon.cluster.coordinator.commons.node.NodeDetail;
import org.wso2.carbon.cluster.coordinator.service.ClusterCoordinator;
import org.wso2.carbon.stream.processor.core.ha.util.HAConstants;
import org.wso2.carbon.stream.processor.core.internal.StreamProcessorDataHolder;
import org.wso2.carbon.stream.processor.core.persistence.PersistenceManager;
import org.wso2.siddhi.core.stream.input.source.SourceHandler;
import org.wso2.siddhi.core.stream.output.sink.SinkHandler;
import org.wso2.siddhi.core.table.record.RecordTableHandler;
import org.wso2.siddhi.core.util.transport.BackoffRetryCounter;

/* loaded from: input_file:org/wso2/carbon/stream/processor/core/ha/HAEventListener.class */
public class HAEventListener extends MemberEventListener {
    private static final Logger log = LoggerFactory.getLogger(HAEventListener.class);
    private BackoffRetryCounter backoffRetryCounter = new BackoffRetryCounter();

    public void memberAdded(NodeDetail nodeDetail) {
        ClusterCoordinator clusterCoordinator = StreamProcessorDataHolder.getClusterCoordinator();
        if (clusterCoordinator.isLeaderNode()) {
            HashMap regsiteredSourceHandlers = StreamProcessorDataHolder.getSourceHandlerManager().getRegsiteredSourceHandlers();
            HAManager hAManager = StreamProcessorDataHolder.getHAManager();
            hAManager.setPassiveNodeAdded(true);
            Iterator it = regsiteredSourceHandlers.values().iterator();
            while (it.hasNext()) {
                ((HACoordinationSourceHandler) ((SourceHandler) it.next())).setPassiveNodeAdded(true);
            }
            if (!nodeDetail.getNodeId().equals(clusterCoordinator.getLeaderNode().getNodeId()) && nodeDetail.getPropertiesMap() != null) {
                hAManager.setPassiveNodeHostPort(getHost(nodeDetail.getPropertiesMap()), getPort(nodeDetail.getPropertiesMap()));
                hAManager.initializeEventSyncConnectionPool();
            }
            new PersistenceManager().run();
        }
    }

    public void memberRemoved(NodeDetail nodeDetail) {
        HashMap regsiteredSourceHandlers = StreamProcessorDataHolder.getSourceHandlerManager().getRegsiteredSourceHandlers();
        StreamProcessorDataHolder.getHAManager().setPassiveNodeAdded(false);
        Iterator it = regsiteredSourceHandlers.values().iterator();
        while (it.hasNext()) {
            ((HACoordinationSourceHandler) ((SourceHandler) it.next())).setPassiveNodeAdded(false);
        }
    }

    public void coordinatorChanged(NodeDetail nodeDetail) {
        ClusterCoordinator clusterCoordinator = StreamProcessorDataHolder.getClusterCoordinator();
        if (clusterCoordinator != null) {
            Map registeredSinkHandlers = StreamProcessorDataHolder.getSinkHandlerManager().getRegisteredSinkHandlers();
            HashMap regsiteredSourceHandlers = StreamProcessorDataHolder.getSourceHandlerManager().getRegsiteredSourceHandlers();
            Map registeredRecordTableHandlers = StreamProcessorDataHolder.getRecordTableHandlerManager().getRegisteredRecordTableHandlers();
            if (!clusterCoordinator.isLeaderNode()) {
                log.info("HA Deployment: This Node is now the Passive Node");
                StreamProcessorDataHolder.getHAManager().changeToPassive();
                Iterator it = registeredSinkHandlers.entrySet().iterator();
                while (it.hasNext()) {
                    ((HACoordinationSinkHandler) ((Map.Entry) it.next()).getValue()).setAsPassive();
                }
                Iterator it2 = regsiteredSourceHandlers.values().iterator();
                while (it2.hasNext()) {
                    ((HACoordinationSourceHandler) ((SourceHandler) it2.next())).setAsPassive();
                }
                Iterator it3 = registeredRecordTableHandlers.values().iterator();
                while (it3.hasNext()) {
                    ((HACoordinationRecordTableHandler) ((RecordTableHandler) it3.next())).setAsPassive();
                }
                return;
            }
            for (SourceHandler sourceHandler : regsiteredSourceHandlers.values()) {
                try {
                    ((HACoordinationSourceHandler) sourceHandler).setAsActive();
                } catch (Throwable th) {
                    log.error("HA Deployment: Error when connecting to source " + sourceHandler.getElementId() + " while changing from passive state to active, skipping the source. ", th);
                }
            }
            StreamProcessorDataHolder.getHAManager().changeToActive();
            if (clusterCoordinator.getAllNodeDetails().size() == 2) {
                NodeDetail passiveNode = getPassiveNode();
                StreamProcessorDataHolder.getHAManager().setPassiveNodeHostPort(getHost(passiveNode.getPropertiesMap()), getPort(passiveNode.getPropertiesMap()));
                StreamProcessorDataHolder.getHAManager().initializeEventSyncConnectionPool();
            }
            for (SinkHandler sinkHandler : registeredSinkHandlers.values()) {
                try {
                    ((HACoordinationSinkHandler) sinkHandler).setAsActive();
                } catch (Throwable th2) {
                    log.error("HA Deployment: Error when connecting to sink " + sinkHandler.getElementId() + " while changing from passive state to active, skipping the sink. ", th2);
                }
            }
            for (RecordTableHandler recordTableHandler : registeredRecordTableHandlers.values()) {
                try {
                    ((HACoordinationRecordTableHandler) recordTableHandler).setAsActive();
                } catch (Throwable th3) {
                    this.backoffRetryCounter.reset();
                    log.error("HA Deployment: Error in connecting to table " + ((HACoordinationRecordTableHandler) recordTableHandler).getTableId() + " while changing from passive state to active, will retry in " + this.backoffRetryCounter.getTimeInterval(), th3);
                    ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
                    this.backoffRetryCounter.increment();
                    newSingleThreadScheduledExecutor.schedule(new RetryRecordTableConnection(this.backoffRetryCounter, recordTableHandler, newSingleThreadScheduledExecutor), this.backoffRetryCounter.getTimeIntervalMillis(), TimeUnit.MILLISECONDS);
                }
            }
        }
    }

    private String getHost(Map map) {
        Object obj = map.get("advertisedHost");
        if (obj == null) {
            obj = map.get(HAConstants.HOST);
        }
        return (String) obj;
    }

    private int getPort(Map map) {
        int i = 0;
        try {
            i = ((Integer) map.get("advertisedPort")).intValue();
        } catch (Exception e) {
            log.warn("Error in getting the advertisedPort from deployment yaml. Hence using port as the advertisedPort" + e.getMessage());
        }
        if (i == 0) {
            i = ((Integer) map.get("port")).intValue();
        }
        return i;
    }

    private NodeDetail getPassiveNode() {
        NodeDetail leaderNode = StreamProcessorDataHolder.getClusterCoordinator().getLeaderNode();
        return (NodeDetail) StreamProcessorDataHolder.getClusterCoordinator().getAllNodeDetails().stream().filter(nodeDetail -> {
            return !nodeDetail.getNodeId().equals(leaderNode.getNodeId());
        }).findFirst().get();
    }
}
