package org.wso2.carbon.stream.processor.core.ha;

import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.log4j.Logger;
import org.wso2.carbon.stream.processor.core.ha.util.CoordinationConstants;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.core.stream.input.source.SourceHandler;
import org.wso2.siddhi.query.api.definition.StreamDefinition;

/* loaded from: input_file:org/wso2/carbon/stream/processor/core/ha/HACoordinationSourceHandler.class */
public class HACoordinationSourceHandler extends SourceHandler {
    private boolean isActiveNode;
    private boolean collectEvents;
    private long lastProcessedEventTimestamp = 0;
    private Queue<Event> passiveNodeBufferedEvents;
    private String sourceHandlerElementId;
    private final int queueCapacity;
    private static final Logger log = Logger.getLogger(HACoordinationSourceHandler.class);

    public HACoordinationSourceHandler(int i) {
        this.queueCapacity = i;
        this.passiveNodeBufferedEvents = new LinkedBlockingQueue(i);
    }

    public void init(String str, StreamDefinition streamDefinition) {
        this.sourceHandlerElementId = str;
    }

    public void sendEvent(Event event, InputHandler inputHandler) throws InterruptedException {
        if (this.isActiveNode) {
            this.lastProcessedEventTimestamp = event.getTimestamp();
            inputHandler.send(event);
            return;
        }
        synchronized (this) {
            if (!this.collectEvents) {
                inputHandler.send(event);
            } else if (!this.passiveNodeBufferedEvents.offer(event)) {
                this.passiveNodeBufferedEvents.remove();
                this.passiveNodeBufferedEvents.add(event);
            }
        }
    }

    public void sendEvent(Event[] eventArr, InputHandler inputHandler) throws InterruptedException {
        if (this.isActiveNode) {
            this.lastProcessedEventTimestamp = eventArr[eventArr.length - 1].getTimestamp();
            inputHandler.send(eventArr);
            return;
        }
        if (!this.collectEvents) {
            inputHandler.send(eventArr);
            return;
        }
        synchronized (this) {
            int size = this.passiveNodeBufferedEvents.size() + eventArr.length;
            if (size >= this.queueCapacity) {
                for (int i = this.queueCapacity; i < size; i++) {
                    this.passiveNodeBufferedEvents.remove();
                }
            }
            for (Event event : eventArr) {
                this.passiveNodeBufferedEvents.add(event);
            }
        }
    }

    public void processBufferedEvents(long j) {
        while (this.passiveNodeBufferedEvents.peek() != null && this.passiveNodeBufferedEvents.peek().getTimestamp() <= j) {
            this.passiveNodeBufferedEvents.remove();
        }
        while (this.passiveNodeBufferedEvents.peek() != null) {
            try {
                getInputHandler().send(this.passiveNodeBufferedEvents.poll());
            } catch (InterruptedException e) {
                log.error("Error esending Passive Node Events after State Sync. ", e);
            }
        }
        collectEvents(false);
        if (log.isDebugEnabled()) {
            log.debug("Setting Source Handler with ID " + this.sourceHandlerElementId + " to stop collecting events in buffer");
        }
        while (this.passiveNodeBufferedEvents.peek() != null) {
            try {
                getInputHandler().send(this.passiveNodeBufferedEvents.poll());
            } catch (InterruptedException e2) {
                log.error("Error Resending Passive Node Events after State Sync. ", e2);
            }
        }
    }

    public void setAsActive() {
        this.isActiveNode = true;
    }

    public void collectEvents(boolean z) {
        this.collectEvents = z;
    }

    public Queue<Event> getPassiveNodeBufferedEvents() {
        return this.passiveNodeBufferedEvents;
    }

    public Map<String, Object> currentState() {
        HashMap hashMap = new HashMap();
        hashMap.put(CoordinationConstants.ACTIVE_PROCESSED_LAST_TIMESTAMP, Long.valueOf(this.lastProcessedEventTimestamp));
        if (log.isDebugEnabled()) {
            log.debug("Active Node: Saving state of Source Handler with Id " + getElementId() + " with timestamp " + this.lastProcessedEventTimestamp);
        }
        return hashMap;
    }

    public void restoreState(Map<String, Object> map) {
        if (map == null || map.get(CoordinationConstants.ACTIVE_PROCESSED_LAST_TIMESTAMP) == null) {
            return;
        }
        processBufferedEvents(((Long) map.get(CoordinationConstants.ACTIVE_PROCESSED_LAST_TIMESTAMP)).longValue());
    }

    public String getElementId() {
        return this.sourceHandlerElementId;
    }
}
