package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.runtime.event.task.TaskEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.shaded.com.google.common.collect.Maps;
import org.apache.flink.shaded.com.google.common.collect.Sets;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.class */
public class UnionInputGate implements InputGate {
    private final InputGate[] inputGates;
    private final Set<InputGate> inputGatesWithRemainingData;
    private final InputGateListener inputGateListener;
    private final int totalNumberOfInputChannels;
    private final Map<InputGate, Integer> inputGateToIndexOffsetMap;
    private boolean requestedPartitionsFlag;

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate$InputGateListener.class */
    private static class InputGateListener implements EventListener<InputGate> {
        private final UnionInputGate unionInputGate;
        private final BlockingQueue<InputGate> inputGatesWithData = new LinkedBlockingQueue();
        private final List<EventListener<InputGate>> registeredListeners = new CopyOnWriteArrayList();

        public InputGateListener(InputGate[] inputGateArr, UnionInputGate unionInputGate) {
            for (InputGate inputGate : inputGateArr) {
                inputGate.registerListener(this);
            }
            this.unionInputGate = unionInputGate;
        }

        @Override // org.apache.flink.runtime.util.event.EventListener
        public void onEvent(InputGate inputGate) {
            this.inputGatesWithData.add(inputGate);
            for (int i = 0; i < this.registeredListeners.size(); i++) {
                this.registeredListeners.get(i).onEvent(this.unionInputGate);
            }
        }

        InputGate getNextInputGateToReadFrom() throws InterruptedException {
            return this.inputGatesWithData.take();
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void registerListener(EventListener<InputGate> eventListener) {
            this.registeredListeners.add(Preconditions.checkNotNull(eventListener));
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public UnionInputGate(InputGate... inputGateArr) {
        this.inputGates = (InputGate[]) Preconditions.checkNotNull(inputGateArr);
        Preconditions.checkArgument(inputGateArr.length > 1, "Union input gate should union at least two input gates.");
        this.inputGateToIndexOffsetMap = Maps.newHashMapWithExpectedSize(inputGateArr.length);
        this.inputGatesWithRemainingData = Sets.newHashSetWithExpectedSize(inputGateArr.length);
        int i = 0;
        for (InputGate inputGate : inputGateArr) {
            this.inputGateToIndexOffsetMap.put(Preconditions.checkNotNull(inputGate), Integer.valueOf(i));
            this.inputGatesWithRemainingData.add(inputGate);
            i += inputGate.getNumberOfInputChannels();
        }
        this.totalNumberOfInputChannels = i;
        this.inputGateListener = new InputGateListener(inputGateArr, this);
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public int getNumberOfInputChannels() {
        return this.totalNumberOfInputChannels;
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public boolean isFinished() {
        for (InputGate inputGate : this.inputGates) {
            if (!inputGate.isFinished()) {
                return false;
            }
        }
        return true;
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public void requestPartitions() throws IOException, InterruptedException {
        if (this.requestedPartitionsFlag) {
            return;
        }
        for (InputGate inputGate : this.inputGates) {
            inputGate.requestPartitions();
        }
        this.requestedPartitionsFlag = true;
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
        if (this.inputGatesWithRemainingData.isEmpty()) {
            return null;
        }
        requestPartitions();
        InputGate nextInputGateToReadFrom = this.inputGateListener.getNextInputGateToReadFrom();
        BufferOrEvent nextBufferOrEvent = nextInputGateToReadFrom.getNextBufferOrEvent();
        if (nextBufferOrEvent.isEvent() && nextBufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class && nextInputGateToReadFrom.isFinished() && !this.inputGatesWithRemainingData.remove(nextInputGateToReadFrom)) {
            throw new IllegalStateException("Couldn't find input gate in set of remaining input gates.");
        }
        nextBufferOrEvent.setChannelIndex(this.inputGateToIndexOffsetMap.get(nextInputGateToReadFrom).intValue() + nextBufferOrEvent.getChannelIndex());
        return nextBufferOrEvent;
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public void sendTaskEvent(TaskEvent taskEvent) throws IOException {
        for (InputGate inputGate : this.inputGates) {
            inputGate.sendTaskEvent(taskEvent);
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public void registerListener(EventListener<InputGate> eventListener) {
        this.inputGateListener.registerListener(eventListener);
    }
}
