/*
 * Decompiled with CFR 0.152.
 */
package org.apache.stratos.cep.extension;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.apache.stratos.cep.extension.CEPTopologyEventReceiver;
import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Member;
import org.apache.stratos.messaging.domain.topology.MemberStatus;
import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.MessagingUtil;
import org.wso2.siddhi.core.config.SiddhiContext;
import org.wso2.siddhi.core.event.StreamEvent;
import org.wso2.siddhi.core.event.in.InEvent;
import org.wso2.siddhi.core.event.in.InListEvent;
import org.wso2.siddhi.core.query.QueryPostProcessingElement;
import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor;
import org.wso2.siddhi.core.query.processor.window.WindowProcessor;
import org.wso2.siddhi.core.snapshot.ThreadBarrier;
import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue;
import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerElement;
import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue;
import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.expression.Expression;
import org.wso2.siddhi.query.api.expression.Variable;
import org.wso2.siddhi.query.api.expression.constant.IntConstant;
import org.wso2.siddhi.query.api.expression.constant.LongConstant;
import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;

@SiddhiExtension(namespace="stratos", function="faultHandling")
public class FaultHandlingWindowProcessor
extends WindowProcessor
implements RunnableWindowProcessor {
    private static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class);
    private static final int TIME_OUT = 60000;
    public static final String CEP_EXTENSION_THREAD_POOL_KEY = "cep.extension.thread.pool";
    public static final int CEP_EXTENSION_THREAD_POOL_SIZE = 10;
    private ExecutorService executorService;
    private ScheduledExecutorService faultHandleScheduler;
    private ScheduledFuture<?> lastSchedule;
    private ThreadBarrier threadBarrier;
    private long timeToKeep;
    private ISchedulerSiddhiQueue<StreamEvent> window;
    private EventPublisher healthStatPublisher = EventPublisherPool.getPublisher((String)MessagingUtil.Topics.HEALTH_STAT_TOPIC.getTopicName());
    private Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>();
    private Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>();
    private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap();
    private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this);
    private int memberIdAttrIndex;

    protected void processEvent(InEvent event) {
        this.addDataToMap(event);
    }

    protected void processEvent(InListEvent listEvent) {
        int size = listEvent.getActiveEvents();
        for (int i = 0; i < size; ++i) {
            this.addDataToMap((InEvent)listEvent.getEvent(i));
        }
    }

    protected void addDataToMap(InEvent event) {
        String id = (String)event.getData()[this.memberIdAttrIndex];
        Member member = this.getMemberFromId(id);
        if (null == member) {
            log.debug((Object)"Member not found in the topology. Event rejected");
            return;
        }
        if (StringUtils.isNotEmpty((CharSequence)id)) {
            this.memberTimeStampMap.put(id, event.getTimeStamp());
        } else {
            log.warn((Object)"NULL member id found in the event received. Event rejected.");
        }
        if (log.isDebugEnabled()) {
            log.debug((Object)("Event received from [member-id] " + id + " [time-stamp] " + event.getTimeStamp()));
        }
    }

    public Iterator<StreamEvent> iterator() {
        return this.window.iterator();
    }

    public Iterator<StreamEvent> iterator(String predicate) {
        if (this.siddhiContext.isDistributedProcessingEnabled()) {
            return ((SchedulerSiddhiQueueGrid)this.window).iterator(predicate);
        }
        return this.window.iterator();
    }

    boolean loadTimeStampMapFromTopology(Topology topology) {
        long currentTimeStamp = System.currentTimeMillis();
        if (topology == null || topology.getServices() == null) {
            return false;
        }
        for (Service service : topology.getServices()) {
            if (service.getClusters() == null) continue;
            for (Cluster cluster : service.getClusters()) {
                if (cluster.getMembers() == null) continue;
                for (Member member : cluster.getMembers()) {
                    if (member == null || !MemberStatus.Active.equals((Object)member.getStatus())) continue;
                    this.memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp);
                }
            }
        }
        if (log.isDebugEnabled()) {
            log.debug((Object)("Member timestamps were successfully loaded from the topology: [timestamps] " + this.memberTimeStampMap));
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Member getMemberFromId(String memberId) {
        if (StringUtils.isEmpty((CharSequence)memberId)) {
            return null;
        }
        if (TopologyManager.getTopology().isInitialized()) {
            try {
                TopologyManager.acquireReadLock();
                if (TopologyManager.getTopology().getServices() == null) {
                    Member member = null;
                    return member;
                }
                for (Service service : TopologyManager.getTopology().getServices()) {
                    if (service.getClusters() == null) continue;
                    for (Cluster cluster : service.getClusters()) {
                        if (cluster.getMembers() == null) continue;
                        for (Member member : cluster.getMembers()) {
                            if (!memberId.equals(member.getMemberId())) continue;
                            Member member2 = member;
                            return member2;
                        }
                    }
                }
            }
            catch (Exception e) {
                log.error((Object)("Error while reading topology" + e));
            }
            finally {
                TopologyManager.releaseReadLock();
            }
        }
        return null;
    }

    private void publishMemberFault(String memberId) {
        Member member = this.getMemberFromId(memberId);
        if (member == null) {
            log.warn((Object)("Failed to publish member fault event. Member having [member-id] " + memberId + " does not exist in topology"));
            return;
        }
        log.info((Object)("Publishing member fault event for [member-id] " + memberId));
        MemberFaultEvent memberFaultEvent = new MemberFaultEvent(member.getClusterId(), member.getClusterInstanceId(), member.getMemberId(), member.getPartitionId(), member.getNetworkPartitionId(), 0.0f);
        this.memberFaultEventMessageMap.put("message", memberFaultEvent);
        this.healthStatPublisher.publish(this.MemberFaultEventMap, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() {
        try {
            this.threadBarrier.pass();
            Iterator<Map.Entry<String, Long>> i$ = this.memberTimeStampMap.entrySet().iterator();
            while (i$.hasNext()) {
                Long eventTimeStamp;
                Map.Entry<String, Long> o;
                Map.Entry<String, Long> pair = o = i$.next();
                long currentTime = System.currentTimeMillis();
                if (currentTime - (eventTimeStamp = pair.getValue()) <= 60000L) continue;
                log.info((Object)("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " + eventTimeStamp + " [time-out] " + 60000 + " milliseconds"));
                this.publishMemberFault(pair.getKey());
            }
            if (log.isDebugEnabled()) {
                log.debug((Object)("Fault handling processor iteration completed with [time-stamp map length] " + this.memberTimeStampMap.size() + " [time-stamp map] " + this.memberTimeStampMap));
            }
        }
        catch (Throwable t) {
            log.error((Object)t.getMessage(), t);
        }
        finally {
            if (this.lastSchedule != null) {
                this.lastSchedule.cancel(false);
            }
            this.lastSchedule = this.faultHandleScheduler.schedule((Runnable)((Object)this), this.timeToKeep, TimeUnit.MILLISECONDS);
        }
    }

    protected Object[] currentState() {
        return new Object[]{this.window.currentState()};
    }

    protected void restoreState(Object[] data) {
        this.window.restoreState(data);
        this.window.reSchedule();
    }

    protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {
        this.timeToKeep = parameters[0] instanceof IntConstant ? (long)((IntConstant)parameters[0]).getValue().intValue() : ((LongConstant)parameters[0]).getValue();
        String memberIdAttrName = ((Variable)parameters[1]).getAttributeName();
        this.memberIdAttrIndex = streamDefinition.getAttributePosition(memberIdAttrName);
        this.window = this.siddhiContext.isDistributedProcessingEnabled() ? new SchedulerSiddhiQueueGrid(elementId, (SchedulerElement)this, this.siddhiContext, this.async) : new SchedulerSiddhiQueue((SchedulerElement)this);
        this.MemberFaultEventMap.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", this.memberFaultEventMessageMap);
        this.executorService = StratosThreadPool.getExecutorService((String)CEP_EXTENSION_THREAD_POOL_KEY, (int)10);
        this.cepTopologyEventReceiver.setExecutorService(this.executorService);
        this.cepTopologyEventReceiver.execute();
        this.window.schedule();
        if (log.isDebugEnabled()) {
            log.debug((Object)("Fault handling window processor initialized with [timeToKeep] " + this.timeToKeep + ", [memberIdAttrName] " + memberIdAttrName + ", [memberIdAttrIndex] " + this.memberIdAttrIndex + ", [distributed-enabled] " + this.siddhiContext.isDistributedProcessingEnabled()));
        }
    }

    public void schedule() {
        if (this.lastSchedule != null) {
            this.lastSchedule.cancel(false);
        }
        this.lastSchedule = this.faultHandleScheduler.schedule((Runnable)((Object)this), this.timeToKeep, TimeUnit.MILLISECONDS);
    }

    public void scheduleNow() {
        if (this.lastSchedule != null) {
            this.lastSchedule.cancel(false);
        }
        this.lastSchedule = this.faultHandleScheduler.schedule((Runnable)((Object)this), 0L, TimeUnit.MILLISECONDS);
    }

    public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
        this.faultHandleScheduler = scheduledExecutorService;
    }

    public void setThreadBarrier(ThreadBarrier threadBarrier) {
        this.threadBarrier = threadBarrier;
    }

    public void destroy() {
        this.cepTopologyEventReceiver.terminate();
        this.window = null;
        if (this.executorService != null) {
            try {
                this.executorService.shutdownNow();
            }
            catch (Exception e) {
                log.warn((Object)"An error occurred while shutting down cep extension executor service", (Throwable)e);
            }
        }
    }

    public ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
        return this.memberTimeStampMap;
    }
}

