package org.wso2.carbon.stream.processor.core.ha;

import com.google.gson.Gson;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.carbon.cluster.coordinator.service.ClusterCoordinator;
import org.wso2.carbon.stream.processor.core.ha.util.CoordinationConstants;
import org.wso2.carbon.stream.processor.core.ha.util.RequestUtil;
import org.wso2.carbon.stream.processor.core.model.OutputSyncTimestampCollection;
import org.wso2.carbon.stream.processor.core.model.OutputSyncTimestamps;
import org.wso2.siddhi.core.stream.output.sink.SinkHandlerManager;
import org.wso2.siddhi.core.table.record.RecordTableHandlerManager;

/* loaded from: input_file:org/wso2/carbon/stream/processor/core/ha/PassiveNodeOutputSyncManager.class */
public class PassiveNodeOutputSyncManager implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(PassiveNodeOutputSyncManager.class);
    private final boolean liveSyncEnabled;
    private final String activeNodeHost;
    private final String activeNodePort;
    private final ClusterCoordinator clusterCoordinator;
    private final SinkHandlerManager sinkHandlerManager;
    private final RecordTableHandlerManager recordTableHandlerManager;
    private final String username;
    private final String password;

    public PassiveNodeOutputSyncManager(ClusterCoordinator clusterCoordinator, SinkHandlerManager sinkHandlerManager, RecordTableHandlerManager recordTableHandlerManager, String str, String str2, boolean z, String str3, String str4) {
        this.activeNodeHost = str;
        this.activeNodePort = str2;
        this.liveSyncEnabled = z;
        this.clusterCoordinator = clusterCoordinator;
        this.sinkHandlerManager = sinkHandlerManager;
        this.recordTableHandlerManager = recordTableHandlerManager;
        this.username = str3;
        this.password = str4;
    }

    @Override // java.lang.Runnable
    public void run() {
        if (this.liveSyncEnabled) {
            String sendRequest = RequestUtil.sendRequest(URI.create(String.format("http://%s:%d/ha/outputSyncTimestamps", this.activeNodeHost, Integer.valueOf(Integer.parseInt(this.activeNodePort)))), this.username, this.password);
            if (log.isDebugEnabled()) {
                log.debug("Passive Node: Accessed active node to retrieve last published timestamps.");
            }
            OutputSyncTimestampCollection outputSyncTimestampCollection = (OutputSyncTimestampCollection) new Gson().fromJson(sendRequest, OutputSyncTimestampCollection.class);
            HashMap registeredSinkHandlers = this.sinkHandlerManager.getRegisteredSinkHandlers();
            if (registeredSinkHandlers.size() != outputSyncTimestampCollection.sizeOfSinkTimestamps()) {
                log.warn("Passive Node: Active node and Passive node do not have same amount of sink handlers.");
            }
            for (OutputSyncTimestamps outputSyncTimestamps : outputSyncTimestampCollection.getLastPublishedTimestamps()) {
                if (log.isDebugEnabled()) {
                    log.debug("Passive Node: Updating publisher queue for sink " + outputSyncTimestamps.getId() + " with timestamp " + outputSyncTimestamps.getTimestamp() + ". Live state sync on");
                }
                HACoordinationSinkHandler hACoordinationSinkHandler = (HACoordinationSinkHandler) registeredSinkHandlers.get(outputSyncTimestamps.getId());
                if (hACoordinationSinkHandler != null) {
                    hACoordinationSinkHandler.trimPassiveNodeEventQueue(outputSyncTimestamps.getTimestamp().longValue());
                }
            }
            Map registeredRecordTableHandlers = this.recordTableHandlerManager.getRegisteredRecordTableHandlers();
            if (registeredRecordTableHandlers.size() != outputSyncTimestampCollection.sizeOfRecordTableTimestamps()) {
                log.warn("Passive Node: Active node and Passive node do not have same amount of sink handlers.");
            }
            for (OutputSyncTimestamps outputSyncTimestamps2 : outputSyncTimestampCollection.getRecordTableLastUpdatedTimestamps()) {
                if (log.isDebugEnabled()) {
                    log.debug("Passive Node: Updating record table queue for record table " + outputSyncTimestamps2.getId() + " with timestamp " + outputSyncTimestamps2.getTimestamp() + ". Live state sync on");
                }
                HACoordinationRecordTableHandler hACoordinationRecordTableHandler = (HACoordinationRecordTableHandler) registeredRecordTableHandlers.get(outputSyncTimestamps2.getId());
                if (hACoordinationRecordTableHandler != null) {
                    hACoordinationRecordTableHandler.trimRecordTableEventQueue(outputSyncTimestamps2.getTimestamp().longValue());
                }
            }
            return;
        }
        Map propertiesMap = this.clusterCoordinator.getLeaderNode().getPropertiesMap();
        if (propertiesMap != null) {
            if (log.isDebugEnabled()) {
                log.debug("Passive Node: Accessed active node properties to retrieve last published timestamps.");
            }
            Map map = (Map) propertiesMap.get(CoordinationConstants.ACTIVE_PROCESSED_LAST_TIMESTAMPS);
            HashMap registeredSinkHandlers2 = this.sinkHandlerManager.getRegisteredSinkHandlers();
            if (registeredSinkHandlers2.size() != map.size()) {
                log.warn("Passive Node: Active node and Passive node do not have same amount of Sinks. Make sure both nodes have deployed same amount of Siddhi Applications.");
            }
            for (Map.Entry entry : map.entrySet()) {
                if (log.isDebugEnabled()) {
                    log.debug("Passive Node: Updating publisher queue for sink " + ((String) entry.getKey()) + " with timestamp " + entry.getValue() + ". Live state sync off");
                }
                HACoordinationSinkHandler hACoordinationSinkHandler2 = (HACoordinationSinkHandler) registeredSinkHandlers2.get(entry.getKey());
                if (hACoordinationSinkHandler2 != null) {
                    hACoordinationSinkHandler2.trimPassiveNodeEventQueue(((Long) entry.getValue()).longValue());
                }
            }
            if (log.isDebugEnabled()) {
                log.debug("Passive Node: Accessed active node properties to retrieve last timestamp of update on record table.");
            }
            Map map2 = (Map) propertiesMap.get(CoordinationConstants.ACTIVE_RECORD_TABLE_LAST_UPDATE_TIMESTAMPS);
            Map registeredRecordTableHandlers2 = this.recordTableHandlerManager.getRegisteredRecordTableHandlers();
            if (registeredRecordTableHandlers2.size() != map2.size()) {
                log.warn("Passive Node: Active node and Passive node do not have same amount of Record Tables. Make sure both nodes have deployed same amount of Siddhi Applications.");
            }
            for (Map.Entry entry2 : map2.entrySet()) {
                if (log.isDebugEnabled()) {
                    log.debug("Passive Node: Updating record table " + ((String) entry2.getKey()) + " with timestamp " + entry2.getValue() + ". Live state sync off");
                }
                HACoordinationRecordTableHandler hACoordinationRecordTableHandler2 = (HACoordinationRecordTableHandler) registeredRecordTableHandlers2.get(entry2.getKey());
                if (hACoordinationRecordTableHandler2 != null) {
                    hACoordinationRecordTableHandler2.trimRecordTableEventQueue(((Long) entry2.getValue()).longValue());
                }
            }
        }
    }
}
