/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.canal.sink.entry.group;

import com.alibaba.otter.canal.sink.entry.group.GroupBarrier;
import com.alibaba.otter.canal.store.model.Event;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class TimelineBarrier
implements GroupBarrier<Event> {
    protected int groupSize;
    protected ReentrantLock lock = new ReentrantLock();
    protected Condition condition = this.lock.newCondition();
    protected volatile long threshold;
    protected BlockingQueue<Long> lastTimestamps = new PriorityBlockingQueue<Long>();

    public TimelineBarrier(int groupSize) {
        this.groupSize = groupSize;
        this.threshold = Long.MIN_VALUE;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void await(Event event) throws InterruptedException {
        long timestamp = this.getTimestamp(event);
        try {
            this.lock.lockInterruptibly();
            this.single(timestamp);
            while (!this.isPermit(event, timestamp)) {
                this.condition.await();
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void await(Event event, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
        long timestamp = this.getTimestamp(event);
        try {
            this.lock.lockInterruptibly();
            this.single(timestamp);
            while (!this.isPermit(event, timestamp)) {
                this.condition.await(timeout, unit);
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public void clear(Event event) {
        this.lastTimestamps.remove(this.getTimestamp(event));
    }

    @Override
    public void interrupt() {
    }

    public long state() {
        return this.threshold;
    }

    protected boolean isPermit(Event event, long state) {
        return state <= this.state();
    }

    protected void notify(long minTimestamp) {
        this.condition.signalAll();
    }

    private void single(long timestamp) throws InterruptedException {
        this.lastTimestamps.add(timestamp);
        if (timestamp < this.state()) {
            this.threshold = timestamp;
        }
        if (this.lastTimestamps.size() >= this.groupSize) {
            Long minTimestamp = (Long)this.lastTimestamps.peek();
            if (minTimestamp != null) {
                this.threshold = minTimestamp;
                this.notify(minTimestamp);
            }
        } else {
            this.threshold = Long.MIN_VALUE;
        }
    }

    private Long getTimestamp(Event event) {
        return event.getEntry().getHeader().getExecuteTime();
    }
}

