/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.Collections;
import java.util.Optional;
import java.util.Timer;
import java.util.TimerTask;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FileRegionBuffer;
import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalInputChannel
extends InputChannel
implements BufferAvailabilityListener {
    private static final Logger LOG = LoggerFactory.getLogger(LocalInputChannel.class);
    private final Object requestLock = new Object();
    private final ResultPartitionManager partitionManager;
    private final TaskEventPublisher taskEventPublisher;
    @Nullable
    private volatile ResultSubpartitionView subpartitionView;
    private volatile boolean isReleased;
    private final ChannelStatePersister channelStatePersister;

    public LocalInputChannel(SingleInputGate inputGate, int channelIndex, ResultPartitionID partitionId, ResultPartitionManager partitionManager, TaskEventPublisher taskEventPublisher, int initialBackoff, int maxBackoff, Counter numBytesIn, Counter numBuffersIn, ChannelStateWriter stateWriter) {
        super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, numBytesIn, numBuffersIn);
        this.partitionManager = (ResultPartitionManager)Preconditions.checkNotNull((Object)partitionManager);
        this.taskEventPublisher = (TaskEventPublisher)Preconditions.checkNotNull((Object)taskEventPublisher);
        this.channelStatePersister = new ChannelStatePersister(stateWriter, this.getChannelInfo());
    }

    @Override
    public void checkpointStarted(CheckpointBarrier barrier) throws CheckpointException {
        this.channelStatePersister.startPersisting(barrier.getId(), Collections.emptyList());
    }

    @Override
    public void checkpointStopped(long checkpointId) {
        this.channelStatePersister.stopPersisting(checkpointId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void requestSubpartition(int subpartitionIndex) throws IOException {
        boolean retriggerRequest = false;
        boolean notifyDataAvailable = false;
        Object object = this.requestLock;
        synchronized (object) {
            Preconditions.checkState((!this.isReleased ? 1 : 0) != 0, (Object)"LocalInputChannel has been released already");
            if (this.subpartitionView == null) {
                LOG.debug("{}: Requesting LOCAL subpartition {} of partition {}. {}", new Object[]{this, subpartitionIndex, this.partitionId, this.channelStatePersister});
                try {
                    ResultSubpartitionView subpartitionView = this.partitionManager.createSubpartitionView(this.partitionId, subpartitionIndex, this);
                    if (subpartitionView == null) {
                        throw new IOException("Error requesting subpartition.");
                    }
                    this.subpartitionView = subpartitionView;
                    if (this.isReleased) {
                        subpartitionView.releaseAllResources();
                        this.subpartitionView = null;
                    } else {
                        notifyDataAvailable = true;
                    }
                }
                catch (PartitionNotFoundException notFound) {
                    if (this.increaseBackoff()) {
                        retriggerRequest = true;
                    }
                    throw notFound;
                }
            }
        }
        if (notifyDataAvailable) {
            this.notifyDataAvailable();
        }
        if (retriggerRequest) {
            this.inputGate.retriggerPartitionRequest(this.partitionId.getPartitionId());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void retriggerSubpartitionRequest(Timer timer, final int subpartitionIndex) {
        Object object = this.requestLock;
        synchronized (object) {
            Preconditions.checkState((this.subpartitionView == null ? 1 : 0) != 0, (Object)"already requested partition");
            timer.schedule(new TimerTask(){

                @Override
                public void run() {
                    try {
                        LocalInputChannel.this.requestSubpartition(subpartitionIndex);
                    }
                    catch (Throwable t) {
                        LocalInputChannel.this.setError(t);
                    }
                }
            }, this.getCurrentBackoff());
        }
    }

    @Override
    Optional<InputChannel.BufferAndAvailability> getNextBuffer() throws IOException {
        this.checkError();
        ResultSubpartitionView subpartitionView = this.subpartitionView;
        if (subpartitionView == null) {
            if (this.isReleased) {
                return Optional.empty();
            }
            subpartitionView = this.checkAndWaitForSubpartitionView();
        }
        ResultSubpartition.BufferAndBacklog next = subpartitionView.getNextBuffer();
        while (next != null && next.buffer().readableBytes() == 0) {
            next.buffer().recycleBuffer();
            next = subpartitionView.getNextBuffer();
            this.numBuffersIn.inc();
        }
        if (next == null) {
            if (subpartitionView.isReleased()) {
                throw new CancelTaskException("Consumed partition " + subpartitionView + " has been released.");
            }
            return Optional.empty();
        }
        Buffer buffer = next.buffer();
        if (buffer instanceof FileRegionBuffer) {
            buffer = ((FileRegionBuffer)buffer).readInto(this.inputGate.getUnpooledSegment());
        }
        this.numBytesIn.inc((long)buffer.getSize());
        this.numBuffersIn.inc();
        this.channelStatePersister.checkForBarrier(buffer);
        this.channelStatePersister.maybePersist(buffer);
        NetworkActionsLogger.traceInput("LocalInputChannel#getNextBuffer", buffer, this.inputGate.getOwningTaskName(), this.channelInfo, this.channelStatePersister, next.getSequenceNumber());
        return Optional.of(new InputChannel.BufferAndAvailability(buffer, next.getNextDataType(), next.buffersInBacklog(), next.getSequenceNumber()));
    }

    @Override
    public void notifyDataAvailable() {
        this.notifyChannelNonEmpty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ResultSubpartitionView checkAndWaitForSubpartitionView() {
        Object object = this.requestLock;
        synchronized (object) {
            Preconditions.checkState((!this.isReleased ? 1 : 0) != 0, (Object)"released");
            Preconditions.checkState((this.subpartitionView != null ? 1 : 0) != 0, (Object)"Queried for a buffer before requesting the subpartition.");
            return this.subpartitionView;
        }
    }

    @Override
    public void resumeConsumption() {
        Preconditions.checkState((!this.isReleased ? 1 : 0) != 0, (Object)"Channel released.");
        ResultSubpartitionView subpartitionView = (ResultSubpartitionView)Preconditions.checkNotNull((Object)this.subpartitionView);
        subpartitionView.resumeConsumption();
        if (subpartitionView.getAvailabilityAndBacklog(Integer.MAX_VALUE).isAvailable()) {
            this.notifyChannelNonEmpty();
        }
    }

    @Override
    public void acknowledgeAllRecordsProcessed() throws IOException {
        Preconditions.checkState((!this.isReleased ? 1 : 0) != 0, (Object)"Channel released.");
        this.subpartitionView.acknowledgeAllDataProcessed();
    }

    @Override
    void sendTaskEvent(TaskEvent event) throws IOException {
        this.checkError();
        Preconditions.checkState((this.subpartitionView != null ? 1 : 0) != 0, (Object)"Tried to send task event to producer before requesting the subpartition.");
        if (!this.taskEventPublisher.publish(this.partitionId, event)) {
            throw new IOException("Error while publishing event " + event + " to producer. The producer could not be found.");
        }
    }

    @Override
    boolean isReleased() {
        return this.isReleased;
    }

    @Override
    void releaseAllResources() throws IOException {
        if (!this.isReleased) {
            this.isReleased = true;
            ResultSubpartitionView view = this.subpartitionView;
            if (view != null) {
                view.releaseAllResources();
                this.subpartitionView = null;
            }
        }
    }

    @Override
    void announceBufferSize(int newBufferSize) {
        Preconditions.checkState((!this.isReleased ? 1 : 0) != 0, (Object)"Channel released.");
        ResultSubpartitionView subpartitionView = (ResultSubpartitionView)Preconditions.checkNotNull((Object)this.subpartitionView);
        subpartitionView.notifyNewBufferSize(newBufferSize);
    }

    @Override
    int getBuffersInUseCount() {
        return this.subpartitionView.getNumberOfQueuedBuffers();
    }

    @Override
    public int unsynchronizedGetNumberOfQueuedBuffers() {
        ResultSubpartitionView view = this.subpartitionView;
        if (view != null) {
            return view.unsynchronizedGetNumberOfQueuedBuffers();
        }
        return 0;
    }

    public String toString() {
        return "LocalInputChannel [" + this.partitionId + "]";
    }

    @VisibleForTesting
    ResultSubpartitionView getSubpartitionView() {
        return this.subpartitionView;
    }
}

