package org.apache.stratos.cep.extension;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Member;
import org.apache.stratos.messaging.domain.topology.MemberStatus;
import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.MessagingUtil;
import org.wso2.siddhi.core.config.SiddhiContext;
import org.wso2.siddhi.core.event.StreamEvent;
import org.wso2.siddhi.core.event.in.InEvent;
import org.wso2.siddhi.core.event.in.InListEvent;
import org.wso2.siddhi.core.query.QueryPostProcessingElement;
import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor;
import org.wso2.siddhi.core.query.processor.window.WindowProcessor;
import org.wso2.siddhi.core.snapshot.ThreadBarrier;
import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue;
import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue;
import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.expression.Expression;
import org.wso2.siddhi.query.api.expression.Variable;
import org.wso2.siddhi.query.api.expression.constant.IntConstant;
import org.wso2.siddhi.query.api.expression.constant.LongConstant;
import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;

@SiddhiExtension(namespace = "stratos", function = "faultHandling")
/* loaded from: input_file:org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.class */
public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor {
    private static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class);
    private static final int TIME_OUT = 60000;
    public static final String CEP_EXTENSION_THREAD_POOL_KEY = "cep.extension.thread.pool";
    public static final int CEP_EXTENSION_THREAD_POOL_SIZE = 10;
    private ExecutorService executorService;
    private ScheduledExecutorService faultHandleScheduler;
    private ScheduledFuture<?> lastSchedule;
    private ThreadBarrier threadBarrier;
    private long timeToKeep;
    private ISchedulerSiddhiQueue<StreamEvent> window;
    private EventPublisher healthStatPublisher = EventPublisherPool.getPublisher(MessagingUtil.Topics.HEALTH_STAT_TOPIC.getTopicName());
    private Map<String, Object> MemberFaultEventMap = new HashMap();
    private Map<String, Object> memberFaultEventMessageMap = new HashMap();
    private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<>();
    private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this);
    private int memberIdAttrIndex;

    protected void processEvent(InEvent inEvent) {
        addDataToMap(inEvent);
    }

    protected void processEvent(InListEvent inListEvent) {
        int activeEvents = inListEvent.getActiveEvents();
        for (int i = 0; i < activeEvents; i++) {
            addDataToMap((InEvent) inListEvent.getEvent(i));
        }
    }

    protected void addDataToMap(InEvent inEvent) {
        String str = (String) inEvent.getData()[this.memberIdAttrIndex];
        if (null == getMemberFromId(str)) {
            log.warn("Member not found in the topology. Event rejected");
            return;
        }
        if (StringUtils.isNotEmpty(str)) {
            this.memberTimeStampMap.put(str, Long.valueOf(inEvent.getTimeStamp()));
        } else {
            log.warn("NULL member id found in the event received. Event rejected.");
        }
        if (log.isDebugEnabled()) {
            log.debug("Event received from [member-id] " + str + " [time-stamp] " + inEvent.getTimeStamp());
        }
    }

    public Iterator<StreamEvent> iterator() {
        return this.window.iterator();
    }

    public Iterator<StreamEvent> iterator(String str) {
        return this.siddhiContext.isDistributedProcessingEnabled() ? this.window.iterator(str) : this.window.iterator();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean loadTimeStampMapFromTopology(Topology topology) {
        long currentTimeMillis = System.currentTimeMillis();
        if (topology == null || topology.getServices() == null) {
            return false;
        }
        for (Service service : topology.getServices()) {
            if (service.getClusters() != null) {
                for (Cluster cluster : service.getClusters()) {
                    if (cluster.getMembers() != null) {
                        for (Member member : cluster.getMembers()) {
                            if (member != null && MemberStatus.Active.equals(member.getStatus())) {
                                this.memberTimeStampMap.putIfAbsent(member.getMemberId(), Long.valueOf(currentTimeMillis));
                            }
                        }
                    }
                }
            }
        }
        if (!log.isDebugEnabled()) {
            return true;
        }
        log.debug("Member timestamps were successfully loaded from the topology: [timestamps] " + this.memberTimeStampMap);
        return true;
    }

    private Member getMemberFromId(String str) {
        if (StringUtils.isEmpty(str)) {
            return null;
        }
        try {
            if (!TopologyManager.getTopology().isInitialized()) {
                return null;
            }
            TopologyManager.acquireReadLock();
            if (TopologyManager.getTopology().getServices() == null) {
                TopologyManager.releaseReadLock();
                return null;
            }
            for (Service service : TopologyManager.getTopology().getServices()) {
                if (service.getClusters() != null) {
                    for (Cluster cluster : service.getClusters()) {
                        if (cluster.getMembers() != null) {
                            for (Member member : cluster.getMembers()) {
                                if (str.equals(member.getMemberId())) {
                                    TopologyManager.releaseReadLock();
                                    return member;
                                }
                            }
                        }
                    }
                }
            }
            return null;
        } catch (Exception e) {
            log.error("Error while reading topology" + e);
            return null;
        } finally {
            TopologyManager.releaseReadLock();
        }
    }

    private void publishMemberFault(Member member) {
        if (member == null) {
            log.warn("Failed to publish member fault event. Member object is null");
            return;
        }
        log.info("Publishing member fault event for [member-id] " + member.getMemberId());
        this.memberFaultEventMessageMap.put("message", new MemberFaultEvent(member.getClusterId(), member.getClusterInstanceId(), member.getMemberId(), member.getPartitionId(), member.getNetworkPartitionId(), 0.0f));
        this.healthStatPublisher.publish(this.MemberFaultEventMap, true);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void run() {
        try {
            try {
                this.threadBarrier.pass();
                for (Map.Entry<String, Long> entry : this.memberTimeStampMap.entrySet()) {
                    long currentTimeMillis = System.currentTimeMillis();
                    Long value = entry.getValue();
                    if (currentTimeMillis - value.longValue() > 60000) {
                        String key = entry.getKey();
                        Member memberFromId = getMemberFromId(key);
                        if (memberFromId != null) {
                            log.info("Faulty member detected [member-id] " + ((Object) entry.getKey()) + " with [last time-stamp] " + value + " [time-out] " + TIME_OUT + " milliseconds");
                            publishMemberFault(memberFromId);
                        }
                        this.memberTimeStampMap.remove(key);
                    }
                }
                if (log.isDebugEnabled()) {
                    log.debug("Fault handling processor iteration completed with [time-stamp map length] " + this.memberTimeStampMap.size() + " [time-stamp map] " + this.memberTimeStampMap);
                }
                if (this.lastSchedule != null) {
                    this.lastSchedule.cancel(false);
                }
                this.lastSchedule = this.faultHandleScheduler.schedule((Runnable) this, this.timeToKeep, TimeUnit.MILLISECONDS);
            } catch (Throwable th) {
                log.error(th.getMessage(), th);
                if (this.lastSchedule != null) {
                    this.lastSchedule.cancel(false);
                }
                this.lastSchedule = this.faultHandleScheduler.schedule((Runnable) this, this.timeToKeep, TimeUnit.MILLISECONDS);
            }
        } catch (Throwable th2) {
            if (this.lastSchedule != null) {
                this.lastSchedule.cancel(false);
            }
            this.lastSchedule = this.faultHandleScheduler.schedule((Runnable) this, this.timeToKeep, TimeUnit.MILLISECONDS);
            throw th2;
        }
    }

    protected Object[] currentState() {
        return new Object[]{this.window.currentState()};
    }

    protected void restoreState(Object[] objArr) {
        this.window.restoreState((Object[]) objArr[0]);
        this.window.reSchedule();
    }

    protected void init(Expression[] expressionArr, QueryPostProcessingElement queryPostProcessingElement, AbstractDefinition abstractDefinition, String str, boolean z, SiddhiContext siddhiContext) {
        if (expressionArr[0] instanceof IntConstant) {
            this.timeToKeep = ((IntConstant) expressionArr[0]).getValue().intValue();
        } else {
            this.timeToKeep = ((LongConstant) expressionArr[0]).getValue().longValue();
        }
        String attributeName = ((Variable) expressionArr[1]).getAttributeName();
        this.memberIdAttrIndex = abstractDefinition.getAttributePosition(attributeName);
        if (this.siddhiContext.isDistributedProcessingEnabled()) {
            this.window = new SchedulerSiddhiQueueGrid(str, this, this.siddhiContext, this.async);
        } else {
            this.window = new SchedulerSiddhiQueue(this);
        }
        this.MemberFaultEventMap.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", this.memberFaultEventMessageMap);
        this.executorService = StratosThreadPool.getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, 10);
        this.cepTopologyEventReceiver.setExecutorService(this.executorService);
        this.cepTopologyEventReceiver.execute();
        this.window.schedule();
        if (log.isDebugEnabled()) {
            log.debug("Fault handling window processor initialized with [timeToKeep] " + this.timeToKeep + ", [memberIdAttrName] " + attributeName + ", [memberIdAttrIndex] " + this.memberIdAttrIndex + ", [distributed-enabled] " + this.siddhiContext.isDistributedProcessingEnabled());
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void schedule() {
        if (this.lastSchedule != null) {
            this.lastSchedule.cancel(false);
        }
        this.lastSchedule = this.faultHandleScheduler.schedule((Runnable) this, this.timeToKeep, TimeUnit.MILLISECONDS);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void scheduleNow() {
        if (this.lastSchedule != null) {
            this.lastSchedule.cancel(false);
        }
        this.lastSchedule = this.faultHandleScheduler.schedule((Runnable) this, 0L, TimeUnit.MILLISECONDS);
    }

    public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
        this.faultHandleScheduler = scheduledExecutorService;
    }

    public void setThreadBarrier(ThreadBarrier threadBarrier) {
        this.threadBarrier = threadBarrier;
    }

    public void destroy() {
        this.cepTopologyEventReceiver.terminate();
        this.window = null;
        if (this.executorService != null) {
            try {
                this.executorService.shutdownNow();
            } catch (Exception e) {
                log.warn("An error occurred while shutting down cep extension executor service", e);
            }
        }
    }

    public ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
        return this.memberTimeStampMap;
    }
}
