/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.ArrayDeque;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler;

@Internal
public class BarrierTracker
implements CheckpointBarrierHandler {
    private static final int MAX_CHECKPOINTS_TO_TRACK = 50;
    private final InputGate inputGate;
    private final int totalNumberOfInputChannels;
    private final ArrayDeque<CheckpointBarrierCount> pendingCheckpoints;
    private EventListener<CheckpointBarrier> checkpointHandler;
    private long latestPendingCheckpointID = -1L;

    public BarrierTracker(InputGate inputGate) {
        this.inputGate = inputGate;
        this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
        this.pendingCheckpoints = new ArrayDeque();
    }

    @Override
    public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
        BufferOrEvent next;
        while ((next = this.inputGate.getNextBufferOrEvent()) != null) {
            if (next.isBuffer() || next.getEvent().getClass() != CheckpointBarrier.class) {
                return next;
            }
            this.processBarrier((CheckpointBarrier)next.getEvent());
        }
        return null;
    }

    @Override
    public void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler) {
        if (this.checkpointHandler != null) {
            throw new IllegalStateException("BarrierTracker already has a registered checkpoint handler");
        }
        this.checkpointHandler = checkpointHandler;
    }

    @Override
    public void cleanup() {
        this.pendingCheckpoints.clear();
    }

    @Override
    public boolean isEmpty() {
        return this.pendingCheckpoints.isEmpty();
    }

    private void processBarrier(CheckpointBarrier receivedBarrier) {
        if (this.totalNumberOfInputChannels == 1) {
            if (this.checkpointHandler != null) {
                this.checkpointHandler.onEvent((Object)receivedBarrier);
            }
            return;
        }
        long barrierId = receivedBarrier.getId();
        CheckpointBarrierCount cbc = null;
        int pos = 0;
        for (CheckpointBarrierCount next : this.pendingCheckpoints) {
            if (next.checkpointId == barrierId) {
                cbc = next;
                break;
            }
            ++pos;
        }
        if (cbc != null) {
            int numBarriersNew = cbc.incrementBarrierCount();
            if (numBarriersNew == this.totalNumberOfInputChannels) {
                for (int i = 0; i <= pos; ++i) {
                    this.pendingCheckpoints.pollFirst();
                }
                if (this.checkpointHandler != null) {
                    this.checkpointHandler.onEvent((Object)receivedBarrier);
                }
            }
        } else if (barrierId > this.latestPendingCheckpointID) {
            this.latestPendingCheckpointID = barrierId;
            this.pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));
            if (this.pendingCheckpoints.size() > 50) {
                this.pendingCheckpoints.pollFirst();
            }
        }
    }

    private static final class CheckpointBarrierCount {
        private final long checkpointId;
        private int barrierCount;

        private CheckpointBarrierCount(long checkpointId) {
            this.checkpointId = checkpointId;
            this.barrierCount = 1;
        }

        public int incrementBarrierCount() {
            return ++this.barrierCount;
        }

        public int hashCode() {
            return (int)(this.checkpointId >>> 32 ^ this.checkpointId) + 17 * this.barrierCount;
        }

        public boolean equals(Object obj) {
            if (obj instanceof CheckpointBarrierCount) {
                CheckpointBarrierCount that = (CheckpointBarrierCount)obj;
                return this.checkpointId == that.checkpointId && this.barrierCount == that.barrierCount;
            }
            return false;
        }

        public String toString() {
            return String.format("checkpointID=%d, count=%d", this.checkpointId, this.barrierCount);
        }
    }
}

