package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.partition.PrioritizedDeque;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.class */
public class UnionInputGate extends InputGate {
    private final Map<Integer, InputGate> inputGatesByGateIndex;
    private final Set<IndexedInputGate> inputGatesWithRemainingData;
    private final PrioritizedDeque<IndexedInputGate> inputGatesWithData = new PrioritizedDeque<>();
    private final int[] inputChannelToInputGateIndex;
    private final int[] inputGateChannelIndexOffsets;
    static final /* synthetic */ boolean $assertionsDisabled;

    public UnionInputGate(IndexedInputGate... indexedInputGateArr) {
        this.inputGatesByGateIndex = (Map) Arrays.stream(indexedInputGateArr).collect(Collectors.toMap((v0) -> {
            return v0.getGateIndex();
        }, indexedInputGate -> {
            return indexedInputGate;
        }));
        Preconditions.checkArgument(indexedInputGateArr.length > 1, "Union input gate should union at least two input gates.");
        if (Arrays.stream(indexedInputGateArr).map((v0) -> {
            return v0.getGateIndex();
        }).distinct().count() != indexedInputGateArr.length) {
            throw new IllegalArgumentException("Union of two input gates with the same gate index. Given indices: " + Arrays.stream(indexedInputGateArr).map((v0) -> {
                return v0.getGateIndex();
            }).collect(Collectors.toList()));
        }
        this.inputGatesWithRemainingData = Sets.newHashSetWithExpectedSize(indexedInputGateArr.length);
        int orElse = Arrays.stream(indexedInputGateArr).mapToInt((v0) -> {
            return v0.getGateIndex();
        }).max().orElse(0);
        int sum = Arrays.stream(indexedInputGateArr).mapToInt((v0) -> {
            return v0.getNumberOfInputChannels();
        }).sum();
        this.inputGateChannelIndexOffsets = new int[orElse + 1];
        this.inputChannelToInputGateIndex = new int[sum];
        int i = 0;
        for (IndexedInputGate indexedInputGate2 : indexedInputGateArr) {
            this.inputGateChannelIndexOffsets[indexedInputGate2.getGateIndex()] = i;
            int i2 = i;
            i += indexedInputGate2.getNumberOfInputChannels();
            Arrays.fill(this.inputChannelToInputGateIndex, i2, i, indexedInputGate2.getGateIndex());
        }
        synchronized (this.inputGatesWithData) {
            for (IndexedInputGate indexedInputGate3 : indexedInputGateArr) {
                this.inputGatesWithRemainingData.add(indexedInputGate3);
                CompletableFuture<?> availableFuture = indexedInputGate3.getAvailableFuture();
                if (availableFuture.isDone()) {
                    this.inputGatesWithData.add(indexedInputGate3);
                } else {
                    FutureUtils.assertNoException(availableFuture.thenRun(() -> {
                        queueInputGate(indexedInputGate3, false);
                    }));
                }
                FutureUtils.assertNoException(indexedInputGate3.getPriorityEventAvailableFuture().thenRun(() -> {
                    handlePriorityEventAvailable(indexedInputGate3);
                }));
            }
            if (!this.inputGatesWithData.isEmpty()) {
                this.availabilityHelper.resetAvailable();
            }
        }
    }

    private void handlePriorityEventAvailable(IndexedInputGate indexedInputGate) {
        queueInputGate(indexedInputGate, true);
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate, org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput
    public int getNumberOfInputChannels() {
        return this.inputChannelToInputGateIndex.length;
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public InputChannel getChannel(int i) {
        int i2 = this.inputChannelToInputGateIndex[i];
        return this.inputGatesByGateIndex.get(Integer.valueOf(i2)).getChannel(i - this.inputGateChannelIndexOffsets[i2]);
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate, org.apache.flink.runtime.io.PullingAsyncDataInput
    public boolean isFinished() {
        return this.inputGatesWithRemainingData.isEmpty();
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public Optional<BufferOrEvent> getNext() throws IOException, InterruptedException {
        return getNextBufferOrEvent(true);
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate, org.apache.flink.runtime.io.PullingAsyncDataInput
    public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
        return getNextBufferOrEvent(false);
    }

    private Optional<BufferOrEvent> getNextBufferOrEvent(boolean z) throws IOException, InterruptedException {
        if (this.inputGatesWithRemainingData.isEmpty()) {
            return Optional.empty();
        }
        Optional<InputGate.InputWithData<IndexedInputGate, BufferOrEvent>> waitAndGetNextData = waitAndGetNextData(z);
        if (!waitAndGetNextData.isPresent()) {
            return Optional.empty();
        }
        InputGate.InputWithData<IndexedInputGate, BufferOrEvent> inputWithData = waitAndGetNextData.get();
        handleEndOfPartitionEvent(inputWithData.data, inputWithData.input);
        if (!inputWithData.data.moreAvailable()) {
            inputWithData.data.setMoreAvailable(inputWithData.moreAvailable);
        }
        return Optional.of(inputWithData.data);
    }

    private Optional<InputGate.InputWithData<IndexedInputGate, BufferOrEvent>> waitAndGetNextData(boolean z) throws IOException, InterruptedException {
        while (true) {
            synchronized (this.inputGatesWithData) {
                Optional<IndexedInputGate> inputGate = getInputGate(z);
                if (!inputGate.isPresent()) {
                    return Optional.empty();
                }
                IndexedInputGate indexedInputGate = inputGate.get();
                Optional<BufferOrEvent> pollNext = indexedInputGate.pollNext();
                if (pollNext.isPresent()) {
                    return Optional.of(processBufferOrEvent(indexedInputGate, pollNext.get()));
                }
                FutureUtils.assertNoException(indexedInputGate.getAvailableFuture().thenRun(() -> {
                    queueInputGate(indexedInputGate, false);
                }));
            }
        }
    }

    private InputGate.InputWithData<IndexedInputGate, BufferOrEvent> processBufferOrEvent(IndexedInputGate indexedInputGate, BufferOrEvent bufferOrEvent) {
        if (!$assertionsDisabled && !Thread.holdsLock(this.inputGatesWithData)) {
            throw new AssertionError();
        }
        if (bufferOrEvent.moreAvailable()) {
            this.inputGatesWithData.add(indexedInputGate, bufferOrEvent.morePriorityEvents(), false);
        } else if (!indexedInputGate.isFinished()) {
            FutureUtils.assertNoException(indexedInputGate.getAvailableFuture().thenRun(() -> {
                queueInputGate(indexedInputGate, false);
            }));
        }
        if (bufferOrEvent.hasPriority() && !bufferOrEvent.morePriorityEvents()) {
            FutureUtils.assertNoException(indexedInputGate.getPriorityEventAvailableFuture().thenRun(() -> {
                handlePriorityEventAvailable(indexedInputGate);
            }));
        }
        boolean z = this.inputGatesWithData.getNumPriorityElements() > 0;
        if (bufferOrEvent.hasPriority() && !z) {
            this.priorityAvailabilityHelper.resetUnavailable();
        }
        return new InputGate.InputWithData<>(indexedInputGate, bufferOrEvent, !this.inputGatesWithData.isEmpty(), z);
    }

    private void handleEndOfPartitionEvent(BufferOrEvent bufferOrEvent, InputGate inputGate) {
        if (bufferOrEvent.isEvent() && bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class && inputGate.isFinished()) {
            Preconditions.checkState(!bufferOrEvent.moreAvailable());
            if (!this.inputGatesWithRemainingData.remove(inputGate)) {
                throw new IllegalStateException("Couldn't find input gate in set of remaining input gates.");
            }
            if (isFinished()) {
                markAvailable();
            }
        }
    }

    private void markAvailable() {
        CompletableFuture<?> unavailableToResetAvailable;
        synchronized (this.inputGatesWithData) {
            unavailableToResetAvailable = this.availabilityHelper.getUnavailableToResetAvailable();
        }
        unavailableToResetAvailable.complete(null);
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public void sendTaskEvent(TaskEvent taskEvent) throws IOException {
        Iterator<InputGate> it = this.inputGatesByGateIndex.values().iterator();
        while (it.hasNext()) {
            it.next().sendTaskEvent(taskEvent);
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate, org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput
    public void resumeConsumption(InputChannelInfo inputChannelInfo) throws IOException {
        this.inputGatesByGateIndex.get(Integer.valueOf(inputChannelInfo.getGateIdx())).resumeConsumption(inputChannelInfo);
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public void setup() {
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public CompletableFuture<Void> getStateConsumedFuture() {
        return CompletableFuture.allOf((CompletableFuture[]) ((List) this.inputGatesByGateIndex.values().stream().map((v0) -> {
            return v0.getStateConsumedFuture();
        }).collect(Collectors.toList())).toArray(new CompletableFuture[0]));
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public void requestPartitions() throws IOException {
        Iterator<InputGate> it = this.inputGatesByGateIndex.values().iterator();
        while (it.hasNext()) {
            it.next().requestPartitions();
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws IOException {
    }

    public String toString() {
        return "UnionInputGate{inputGates=" + this.inputGatesByGateIndex.values() + '}';
    }

    /*  JADX ERROR: NullPointerException in pass: AttachTryCatchVisitor
        java.lang.NullPointerException
        */
    private void queueInputGate(org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate r6, boolean r7) {
        /*
            Method dump skipped, instructions count: 286
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.queueInputGate(org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate, boolean):void");
    }

    private Optional<IndexedInputGate> getInputGate(boolean z) throws InterruptedException {
        if (!$assertionsDisabled && !Thread.holdsLock(this.inputGatesWithData)) {
            throw new AssertionError();
        }
        while (this.inputGatesWithData.isEmpty()) {
            if (!z) {
                this.availabilityHelper.resetUnavailable();
                return Optional.empty();
            }
            this.inputGatesWithData.wait();
        }
        return Optional.of(this.inputGatesWithData.poll());
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public void finishReadRecoveredState() throws IOException {
        Iterator<InputGate> it = this.inputGatesByGateIndex.values().iterator();
        while (it.hasNext()) {
            it.next().finishReadRecoveredState();
        }
    }

    static {
        $assertionsDisabled = !UnionInputGate.class.desiredAssertionStatus();
    }
}
