/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.util.TestPartitionProducer;
import org.apache.flink.runtime.io.network.util.TestProducerSource;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.shaded.com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import scala.Tuple2;

public class LocalInputChannelTest {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentConsumeMultiplePartitions() throws Exception {
        int parallelism = 32;
        int producerBufferPoolSize = 33;
        int numberOfBuffersPerChannel = 1024;
        Preconditions.checkArgument((boolean)true);
        Preconditions.checkArgument((boolean)true);
        Preconditions.checkArgument((boolean)true);
        ExecutorService executor = Executors.newFixedThreadPool(64);
        NetworkBufferPool networkBuffers = new NetworkBufferPool(2080, 32768, MemoryType.HEAP);
        ResultPartitionConsumableNotifier partitionConsumableNotifier = (ResultPartitionConsumableNotifier)Mockito.mock(ResultPartitionConsumableNotifier.class);
        IOManager ioManager = (IOManager)Mockito.mock(IOManager.class);
        JobID jobId = new JobID();
        ResultPartitionManager partitionManager = new ResultPartitionManager();
        ResultPartitionID[] partitionIds = new ResultPartitionID[32];
        TestPartitionProducer[] partitionProducers = new TestPartitionProducer[32];
        for (int i = 0; i < 32; ++i) {
            partitionIds[i] = new ResultPartitionID();
            ResultPartition partition = new ResultPartition("Test Name", jobId, partitionIds[i], ResultPartitionType.PIPELINED, false, 32, partitionManager, partitionConsumableNotifier, ioManager, IOManager.IOMode.ASYNC);
            partition.registerBufferPool(networkBuffers.createBufferPool(33, true));
            partitionProducers[i] = new TestPartitionProducer(partition, false, new TestPartitionProducerBufferSource(32, partition.getBufferProvider(), 1024));
            partitionManager.registerResultPartition(partition);
        }
        try {
            int i;
            ArrayList results = Lists.newArrayListWithCapacity((int)33);
            for (i = 0; i < 32; ++i) {
                results.add(executor.submit(partitionProducers[i]));
            }
            for (i = 0; i < 32; ++i) {
                results.add(executor.submit(new TestLocalInputChannelConsumer(i, 32, 1024, networkBuffers.createBufferPool(32, true), partitionManager, new TaskEventDispatcher(), partitionIds)));
            }
            for (Future result : results) {
                result.get();
            }
        }
        finally {
            networkBuffers.destroy();
            executor.shutdown();
        }
    }

    @Test
    public void testPartitionRequestExponentialBackoff() throws Exception {
        Tuple2 backoff = new Tuple2((Object)500, (Object)3000);
        int[] expectedDelays = new int[]{(Integer)backoff._1(), 1000, 2000, (Integer)backoff._2()};
        SingleInputGate inputGate = (SingleInputGate)Mockito.mock(SingleInputGate.class);
        BufferProvider bufferProvider = (BufferProvider)Mockito.mock(BufferProvider.class);
        Mockito.when((Object)inputGate.getBufferProvider()).thenReturn((Object)bufferProvider);
        ResultPartitionManager partitionManager = (ResultPartitionManager)Mockito.mock(ResultPartitionManager.class);
        LocalInputChannel ch = this.createLocalInputChannel(inputGate, partitionManager, (Tuple2<Integer, Integer>)backoff);
        Mockito.when((Object)partitionManager.createSubpartitionView((ResultPartitionID)Matchers.eq((Object)ch.partitionId), Matchers.eq((int)0), (BufferProvider)Matchers.eq((Object)bufferProvider))).thenThrow(new Throwable[]{new PartitionNotFoundException(ch.partitionId)});
        Timer timer = (Timer)Mockito.mock(Timer.class);
        ((Timer)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocation) throws Throwable {
                ((TimerTask)invocation.getArguments()[0]).run();
                return null;
            }
        }).when((Object)timer)).schedule((TimerTask)Matchers.any(TimerTask.class), Matchers.anyLong());
        ch.requestSubpartition(0);
        ((ResultPartitionManager)Mockito.verify((Object)partitionManager)).createSubpartitionView((ResultPartitionID)Matchers.eq((Object)ch.partitionId), Matchers.eq((int)0), (BufferProvider)Matchers.eq((Object)bufferProvider));
        int[] nArray = expectedDelays;
        int n = nArray.length;
        for (int i = 0; i < n; ++i) {
            long expected = nArray[i];
            ch.retriggerSubpartitionRequest(timer, 0);
            ((Timer)Mockito.verify((Object)timer)).schedule((TimerTask)Matchers.any(TimerTask.class), Matchers.eq((long)expected));
        }
        try {
            ch.retriggerSubpartitionRequest(timer, 0);
            ch.getNextBuffer();
            Assert.fail((String)"Did not throw expected exception.");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test(expected=CancelTaskException.class)
    public void testProducerFailedException() throws Exception {
        ResultSubpartitionView view = (ResultSubpartitionView)Mockito.mock(ResultSubpartitionView.class);
        Mockito.when((Object)view.isReleased()).thenReturn((Object)true);
        Mockito.when((Object)view.getFailureCause()).thenReturn((Object)new Exception("Expected test exception"));
        ResultPartitionManager partitionManager = (ResultPartitionManager)Mockito.mock(ResultPartitionManager.class);
        Mockito.when((Object)partitionManager.createSubpartitionView((ResultPartitionID)Matchers.any(ResultPartitionID.class), Matchers.anyInt(), (BufferProvider)Matchers.any(BufferProvider.class))).thenReturn((Object)view);
        SingleInputGate inputGate = (SingleInputGate)Mockito.mock(SingleInputGate.class);
        BufferProvider bufferProvider = (BufferProvider)Mockito.mock(BufferProvider.class);
        Mockito.when((Object)inputGate.getBufferProvider()).thenReturn((Object)bufferProvider);
        LocalInputChannel ch = this.createLocalInputChannel(inputGate, partitionManager, (Tuple2<Integer, Integer>)new Tuple2((Object)0, (Object)0));
        ch.requestSubpartition(0);
        ch.getNextBuffer();
    }

    private LocalInputChannel createLocalInputChannel(SingleInputGate inputGate, ResultPartitionManager partitionManager, Tuple2<Integer, Integer> initialAndMaxRequestBackoff) throws IOException, InterruptedException {
        return new LocalInputChannel(inputGate, 0, new ResultPartitionID(), partitionManager, (TaskEventDispatcher)Mockito.mock(TaskEventDispatcher.class), initialAndMaxRequestBackoff);
    }

    private static class TestLocalInputChannelConsumer
    implements Callable<Void> {
        private final SingleInputGate inputGate;
        private final int numberOfInputChannels;
        private final int numberOfExpectedBuffersPerChannel;

        public TestLocalInputChannelConsumer(int subpartitionIndex, int numberOfInputChannels, int numberOfExpectedBuffersPerChannel, BufferPool bufferPool, ResultPartitionManager partitionManager, TaskEventDispatcher taskEventDispatcher, ResultPartitionID[] consumedPartitionIds) {
            Preconditions.checkArgument((numberOfInputChannels >= 1 ? 1 : 0) != 0);
            Preconditions.checkArgument((numberOfExpectedBuffersPerChannel >= 1 ? 1 : 0) != 0);
            this.inputGate = new SingleInputGate("Test Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), subpartitionIndex, numberOfInputChannels, (PartitionStateChecker)Mockito.mock(PartitionStateChecker.class));
            this.inputGate.setBufferPool(bufferPool);
            for (int i = 0; i < numberOfInputChannels; ++i) {
                this.inputGate.setInputChannel(new IntermediateResultPartitionID(), (InputChannel)new LocalInputChannel(this.inputGate, i, consumedPartitionIds[i], partitionManager, taskEventDispatcher));
            }
            this.numberOfInputChannels = numberOfInputChannels;
            this.numberOfExpectedBuffersPerChannel = numberOfExpectedBuffersPerChannel;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Void call() throws Exception {
            int[] numberOfBuffersPerChannel = new int[this.numberOfInputChannels];
            try {
                BufferOrEvent boe;
                while ((boe = this.inputGate.getNextBufferOrEvent()) != null) {
                    if (!boe.isBuffer()) continue;
                    boe.getBuffer().recycle();
                    int n = boe.getChannelIndex();
                    numberOfBuffersPerChannel[n] = numberOfBuffersPerChannel[n] + 1;
                    if (numberOfBuffersPerChannel[n] <= this.numberOfExpectedBuffersPerChannel) continue;
                    throw new IllegalStateException("Received more buffers than expected on channel " + boe.getChannelIndex() + ".");
                }
                for (int i = 0; i < numberOfBuffersPerChannel.length; ++i) {
                    int actualNumberOfReceivedBuffers = numberOfBuffersPerChannel[i];
                    if (actualNumberOfReceivedBuffers == this.numberOfExpectedBuffersPerChannel) continue;
                    throw new IllegalStateException("Received unexpected number of buffers on channel " + i + " (" + actualNumberOfReceivedBuffers + " instead " + "of " + this.numberOfExpectedBuffersPerChannel + ").");
                }
            }
            finally {
                this.inputGate.releaseAllResources();
            }
            return null;
        }
    }

    private static class TestPartitionProducerBufferSource
    implements TestProducerSource {
        private final BufferProvider bufferProvider;
        private final List<Byte> channelIndexes;

        public TestPartitionProducerBufferSource(int parallelism, BufferProvider bufferProvider, int numberOfBuffersToProduce) {
            this.bufferProvider = bufferProvider;
            this.channelIndexes = Lists.newArrayListWithCapacity((int)(parallelism * numberOfBuffersToProduce));
            for (byte i = 0; i < parallelism; i = (byte)(i + 1)) {
                for (int j = 0; j < numberOfBuffersToProduce; ++j) {
                    this.channelIndexes.add(i);
                }
            }
            Collections.shuffle(this.channelIndexes);
        }

        @Override
        public BufferOrEvent getNextBufferOrEvent() throws Exception {
            if (this.channelIndexes.size() > 0) {
                byte channelIndex = this.channelIndexes.remove(0);
                return new BufferOrEvent(this.bufferProvider.requestBufferBlocking(), (int)channelIndex);
            }
            return null;
        }
    }
}

