package org.wso2.carbon.stream.processor.core.ha;

import java.util.Iterator;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.carbon.cluster.coordinator.commons.MemberEventListener;
import org.wso2.carbon.cluster.coordinator.commons.node.NodeDetail;
import org.wso2.carbon.cluster.coordinator.service.ClusterCoordinator;
import org.wso2.carbon.stream.processor.core.internal.StreamProcessorDataHolder;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.stream.input.source.SourceHandler;
import org.wso2.siddhi.core.stream.output.sink.SinkHandler;
import org.wso2.siddhi.core.table.record.RecordTableHandler;
import org.wso2.siddhi.core.util.transport.BackoffRetryCounter;

/* loaded from: input_file:org/wso2/carbon/stream/processor/core/ha/HAEventListener.class */
public class HAEventListener extends MemberEventListener {
    private static final Logger log = LoggerFactory.getLogger(HAEventListener.class);
    private BackoffRetryCounter backoffRetryCounter = new BackoffRetryCounter();

    public void memberAdded(NodeDetail nodeDetail) {
    }

    public void memberRemoved(NodeDetail nodeDetail) {
    }

    public void coordinatorChanged(NodeDetail nodeDetail) {
        ClusterCoordinator clusterCoordinator = StreamProcessorDataHolder.getClusterCoordinator();
        if (clusterCoordinator == null || !clusterCoordinator.isLeaderNode()) {
            return;
        }
        log.info("HA Deployment: This Node is now the Active Node");
        StreamProcessorDataHolder.getHAManager().changeToActive();
        Iterator it = StreamProcessorDataHolder.getSinkHandlerManager().getRegisteredSinkHandlers().values().iterator();
        while (it.hasNext()) {
            ((HACoordinationSinkHandler) ((SinkHandler) it.next())).setAsActive();
        }
        Iterator it2 = StreamProcessorDataHolder.getSourceHandlerManager().getRegsiteredSourceHandlers().values().iterator();
        while (it2.hasNext()) {
            ((HACoordinationSourceHandler) ((SourceHandler) it2.next())).setAsActive();
        }
        for (RecordTableHandler recordTableHandler : StreamProcessorDataHolder.getRecordTableHandlerManager().getRegisteredRecordTableHandlers().values()) {
            try {
                ((HACoordinationRecordTableHandler) recordTableHandler).setAsActive();
            } catch (ConnectionUnavailableException e) {
                this.backoffRetryCounter.reset();
                log.error("HA Deployment: Error in connecting to table " + ((HACoordinationRecordTableHandler) recordTableHandler).getTableId() + " while changing from passive state to active, will retry in " + this.backoffRetryCounter.getTimeInterval(), e);
                ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
                this.backoffRetryCounter.increment();
                newSingleThreadScheduledExecutor.schedule(new RetryRecordTableConnection(this.backoffRetryCounter, recordTableHandler, newSingleThreadScheduledExecutor), this.backoffRetryCounter.getTimeIntervalMillis(), TimeUnit.MILLISECONDS);
            }
        }
    }
}
