/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.kafka.multidc.source;

import io.siddhi.core.stream.input.source.SourceEventListener;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;

public class SourceSynchronizer {
    private static final Logger LOG = Logger.getLogger(SourceSynchronizer.class);
    private final SourceEventListener eventListener;
    boolean isEventGap = false;
    Map<Long, BufferValueHolder> eventBuffer = new TreeMap<Long, BufferValueHolder>();
    Map<String, Long> perSourceReceivedSeqNo = new HashMap<String, Long>();
    Timer flushBufferTimer = new Timer(true);
    String[] bootstrapServers = new String[2];
    List<Long> toRemoveSeqNos = new ArrayList<Long>();
    private Long lastConsumedSeqNo = -1L;
    private int maxBufferSize;
    private int bufferInterval;
    private AtomicBoolean isFlushTaskDue = new AtomicBoolean(false);

    public SourceSynchronizer(SourceEventListener eventListener, String[] bootstrapServers, int maxBufferSize, int bufferFlushInterval) {
        this.eventListener = eventListener;
        this.bootstrapServers[0] = bootstrapServers[0];
        this.bootstrapServers[1] = bootstrapServers[1];
        this.maxBufferSize = maxBufferSize;
        this.bufferInterval = bufferFlushInterval;
        this.perSourceReceivedSeqNo.put(bootstrapServers[0], -1L);
        this.perSourceReceivedSeqNo.put(bootstrapServers[1], -1L);
    }

    private synchronized void forceFlushBuffer(long flushTillSeqNo) {
        for (Map.Entry<Long, BufferValueHolder> entry : this.eventBuffer.entrySet()) {
            Long sequenceNumber = entry.getKey();
            BufferValueHolder eventHolder = entry.getValue();
            if (sequenceNumber <= this.lastConsumedSeqNo || sequenceNumber > flushTillSeqNo) continue;
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Updating the lastConsumedSeqNo=" + sequenceNumber + " as the event is forcefully flushed, from the source " + eventHolder.getSourceId()));
            }
            if (sequenceNumber >= this.lastConsumedSeqNo && this.lastConsumedSeqNo != sequenceNumber + 1L) {
                LOG.warn((Object)("Events lost from sequence " + (this.lastConsumedSeqNo + 1L) + " to " + (sequenceNumber - 1L)));
            }
            this.lastConsumedSeqNo = sequenceNumber;
            this.toRemoveSeqNos.add(sequenceNumber);
            this.eventListener.onEvent(eventHolder.getEvent(), eventHolder.getStrings());
        }
        this.toRemoveSeqNos.forEach(seqNo -> this.eventBuffer.remove(seqNo));
        this.toRemoveSeqNos.clear();
    }

    private synchronized void flushBuffer() {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)"Start flushing buffer");
        }
        for (Map.Entry<Long, BufferValueHolder> entry : this.eventBuffer.entrySet()) {
            Long sequenceNumber = entry.getKey();
            BufferValueHolder eventHolder = entry.getValue();
            if (sequenceNumber <= this.lastConsumedSeqNo) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Message with sequence " + sequenceNumber + " already received. Dropping the event from the buffer"));
                }
                this.toRemoveSeqNos.add(sequenceNumber);
                continue;
            }
            if (sequenceNumber == this.lastConsumedSeqNo + 1L) {
                this.isEventGap = false;
                Long l = this.lastConsumedSeqNo;
                Long l2 = this.lastConsumedSeqNo = Long.valueOf(this.lastConsumedSeqNo + 1L);
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Message with sequence " + sequenceNumber + " flushed from buffer. Updating lastConsumedSeqNo=" + this.lastConsumedSeqNo));
                }
                this.toRemoveSeqNos.add(sequenceNumber);
                this.eventListener.onEvent(eventHolder.getEvent(), eventHolder.getStrings());
                continue;
            }
            this.isEventGap = true;
            if (!LOG.isDebugEnabled()) break;
            LOG.debug((Object)("Gap detected while flushing the buffer. Flushed message sequence=" + sequenceNumber + ". Expected sequence=" + (this.lastConsumedSeqNo + 1L) + ". Stop flushing the buffer."));
            break;
        }
        this.toRemoveSeqNos.forEach(seqNo -> this.eventBuffer.remove(seqNo));
        this.toRemoveSeqNos.clear();
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)"End flushing buffer");
        }
    }

    private synchronized void bufferEvent(String sourceId, long sequenceNumber, Object event, String[] strings) {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Buffering Event. SourceId=" + sourceId + ", SequenceNumber=" + sequenceNumber));
        }
        if (this.eventBuffer.size() >= this.maxBufferSize) {
            long flushTillSeq = Math.max(this.perSourceReceivedSeqNo.get(this.bootstrapServers[0]), this.perSourceReceivedSeqNo.get(this.bootstrapServers[1]));
            LOG.info((Object)("Buffer size exceeded. Force flushing events till the sequence " + sequenceNumber));
            this.forceFlushBuffer(flushTillSeq);
        }
        this.eventBuffer.put(sequenceNumber, new BufferValueHolder(event, sourceId, strings));
    }

    public synchronized void onEvent(String sourceId, long sequenceNumber, Object event, String[] strings) {
        this.perSourceReceivedSeqNo.put(sourceId, sequenceNumber);
        if (sequenceNumber <= this.lastConsumedSeqNo) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Message with sequence " + sequenceNumber + " already received. Dropping the event from source " + sourceId + ":" + event));
            }
        } else if (sequenceNumber == this.lastConsumedSeqNo + 1L) {
            Long l = this.lastConsumedSeqNo;
            Long l2 = this.lastConsumedSeqNo = Long.valueOf(this.lastConsumedSeqNo + 1L);
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Message with sequence " + sequenceNumber + " received from source " + sourceId + ". Updating lastConsumedSeqNo=" + this.lastConsumedSeqNo));
            }
            this.eventListener.onEvent(event, strings);
            if (!this.eventBuffer.isEmpty()) {
                this.flushBuffer();
            }
        } else if (this.isEventGap) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Message with sequence " + sequenceNumber + " from source" + sourceId + ". Couldn't fill the gap, buffering the event."));
            }
            this.bufferEvent(sourceId, sequenceNumber, event, strings);
            long flushTillSeq = Math.min(this.perSourceReceivedSeqNo.get(this.bootstrapServers[0]), this.perSourceReceivedSeqNo.get(this.bootstrapServers[1]));
            this.isEventGap = false;
            this.forceFlushBuffer(flushTillSeq);
        } else {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Gap detected. Message with sequence " + sequenceNumber + " received from source " + sourceId + ". Expected sequence number is " + (this.lastConsumedSeqNo + 1L) + ". Starting buffering events"));
            }
            this.isEventGap = true;
            this.bufferEvent(sourceId, sequenceNumber, event, strings);
            if (!this.isFlushTaskDue.get()) {
                this.flushBufferTimer.schedule((TimerTask)new BufferFlushTask(), this.bufferInterval);
                this.isFlushTaskDue.set(true);
            }
        }
    }

    public synchronized Long getLastConsumedSeqNo() {
        return this.lastConsumedSeqNo;
    }

    public synchronized void setLastConsumedSeqNo(long seqNo) {
        this.lastConsumedSeqNo = seqNo;
    }

    class BufferFlushTask
    extends TimerTask {
        private final Logger log = Logger.getLogger(BufferFlushTask.class);

        BufferFlushTask() {
        }

        @Override
        public synchronized void run() {
            SourceSynchronizer.this.isFlushTaskDue.set(false);
            long flushTillSeq = Math.max(SourceSynchronizer.this.perSourceReceivedSeqNo.get(SourceSynchronizer.this.bootstrapServers[0]), SourceSynchronizer.this.perSourceReceivedSeqNo.get(SourceSynchronizer.this.bootstrapServers[1]));
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)("Executing the buffer flushing task. Flushing buffers till " + flushTillSeq));
            }
            SourceSynchronizer.this.forceFlushBuffer(flushTillSeq);
        }
    }

    static class BufferValueHolder {
        String[] strings;
        private Object event;
        private String sourceId;

        BufferValueHolder(Object event, String sourceId, String[] strings) {
            this.event = event;
            this.sourceId = sourceId;
            this.strings = strings;
        }

        String[] getStrings() {
            return this.strings;
        }

        String getSourceId() {
            return this.sourceId;
        }

        public Object getEvent() {
            return this.event;
        }
    }
}

