package org.wso2.carbon.stream.processor.core.ha;

import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.stream.output.sink.SinkHandler;
import org.wso2.siddhi.core.stream.output.sink.SinkHandlerCallback;
import org.wso2.siddhi.query.api.definition.StreamDefinition;

/* loaded from: input_file:org/wso2/carbon/stream/processor/core/ha/HACoordinationSinkHandler.class */
public class HACoordinationSinkHandler extends SinkHandler {
    private boolean isActiveNode;
    private long lastPublishedEventTimestamp = 0;
    private Queue<Event> passiveNodeProcessedEvents;
    private String sinkHandlerElementId;
    private final int queueCapacity;

    public HACoordinationSinkHandler(int i) {
        this.queueCapacity = i;
        this.passiveNodeProcessedEvents = new LinkedBlockingQueue(i);
    }

    public void init(String str, StreamDefinition streamDefinition) {
        this.sinkHandlerElementId = str;
    }

    public void handle(Event event, SinkHandlerCallback sinkHandlerCallback) {
        if (this.isActiveNode) {
            this.lastPublishedEventTimestamp = event.getTimestamp();
            sinkHandlerCallback.mapAndSend(event);
        } else {
            synchronized (this) {
                if (!this.passiveNodeProcessedEvents.offer(event)) {
                    this.passiveNodeProcessedEvents.remove();
                    this.passiveNodeProcessedEvents.add(event);
                }
            }
        }
    }

    public void handle(Event[] eventArr, SinkHandlerCallback sinkHandlerCallback) {
        if (this.isActiveNode) {
            this.lastPublishedEventTimestamp = eventArr[eventArr.length - 1].getTimestamp();
            sinkHandlerCallback.mapAndSend(eventArr);
            return;
        }
        synchronized (this) {
            int size = this.passiveNodeProcessedEvents.size() + eventArr.length;
            if (size >= this.queueCapacity) {
                for (int i = this.queueCapacity; i < size; i++) {
                    this.passiveNodeProcessedEvents.remove();
                }
            }
            for (Event event : eventArr) {
                this.passiveNodeProcessedEvents.add(event);
            }
        }
    }

    public void setAsActive() {
        this.isActiveNode = true;
        Iterator<Event> it = this.passiveNodeProcessedEvents.iterator();
        while (it.hasNext()) {
            handle(it.next());
        }
        this.passiveNodeProcessedEvents.clear();
    }

    public long getActiveNodeLastPublishedTimestamp() {
        return this.lastPublishedEventTimestamp;
    }

    public void trimPassiveNodeEventQueue(long j) {
        while (this.passiveNodeProcessedEvents.peek() != null && this.passiveNodeProcessedEvents.peek().getTimestamp() <= j) {
            this.passiveNodeProcessedEvents.remove();
        }
    }

    public Queue<Event> getPassiveNodeProcessedEvents() {
        return this.passiveNodeProcessedEvents;
    }

    public Map<String, Object> currentState() {
        return null;
    }

    public void restoreState(Map<String, Object> map) {
    }

    public String getElementId() {
        return this.sinkHandlerElementId;
    }
}
