package org.apache.flink.streaming.api.operators.async;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.MockStreamConfig;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.class */
public class AsyncWaitOperatorTest extends TestLogger {
    private static final long TIMEOUT = 1000;

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest$ControllableAsyncFunction.class */
    private static class ControllableAsyncFunction<IN> implements AsyncFunction<IN, IN> {
        private static final long serialVersionUID = -4214078239267288636L;
        private transient CompletableFuture<Void> trigger;

        private ControllableAsyncFunction(CompletableFuture<Void> completableFuture) {
            this.trigger = (CompletableFuture) Preconditions.checkNotNull(completableFuture);
        }

        public void asyncInvoke(IN in, ResultFuture<IN> resultFuture) throws Exception {
            this.trigger.thenAccept(r5 -> {
                resultFuture.complete(Collections.singleton(in));
            });
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest$IgnoreTimeoutLazyAsyncFunction.class */
    private static class IgnoreTimeoutLazyAsyncFunction extends LazyAsyncFunction {
        private static final long serialVersionUID = 1428714561365346128L;

        private IgnoreTimeoutLazyAsyncFunction() {
        }

        public void timeout(Integer num, ResultFuture<Integer> resultFuture) throws Exception {
            resultFuture.complete(Collections.singletonList(Integer.valueOf(num.intValue() * 3)));
        }

        public /* bridge */ /* synthetic */ void timeout(Object obj, ResultFuture resultFuture) throws Exception {
            timeout((Integer) obj, (ResultFuture<Integer>) resultFuture);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest$LazyAsyncFunction.class */
    public static class LazyAsyncFunction extends MyAsyncFunction {
        private static final long serialVersionUID = 3537791752703154670L;
        private static CountDownLatch latch;

        public LazyAsyncFunction() {
            super();
            latch = new CountDownLatch(1);
        }

        @Override // org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest.MyAsyncFunction
        public void asyncInvoke(final Integer num, final ResultFuture<Integer> resultFuture) throws Exception {
            executorService.submit(new Runnable() { // from class: org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest.LazyAsyncFunction.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        LazyAsyncFunction.latch.await();
                    } catch (InterruptedException e) {
                    }
                    resultFuture.complete(Collections.singletonList(num));
                }
            });
        }

        public static void countDown() {
            latch.countDown();
        }

        @Override // org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest.MyAsyncFunction
        public /* bridge */ /* synthetic */ void asyncInvoke(Object obj, ResultFuture resultFuture) throws Exception {
            asyncInvoke((Integer) obj, (ResultFuture<Integer>) resultFuture);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest$MyAsyncFunction.class */
    public static class MyAsyncFunction extends RichAsyncFunction<Integer, Integer> {
        private static final long serialVersionUID = 8522411971886428444L;
        private static final long TERMINATION_TIMEOUT = 5000;
        private static final int THREAD_POOL_SIZE = 10;
        static ExecutorService executorService;
        static int counter = 0;

        private MyAsyncFunction() {
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            synchronized (MyAsyncFunction.class) {
                if (counter == 0) {
                    executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
                }
                counter++;
            }
        }

        public void close() throws Exception {
            super.close();
            freeExecutor();
        }

        private void freeExecutor() {
            synchronized (MyAsyncFunction.class) {
                counter--;
                if (counter == 0) {
                    executorService.shutdown();
                    try {
                        if (!executorService.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.MILLISECONDS)) {
                            executorService.shutdownNow();
                        }
                    } catch (InterruptedException e) {
                        executorService.shutdownNow();
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }

        public void asyncInvoke(final Integer num, final ResultFuture<Integer> resultFuture) throws Exception {
            executorService.submit(new Runnable() { // from class: org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest.MyAsyncFunction.1
                @Override // java.lang.Runnable
                public void run() {
                    resultFuture.complete(Collections.singletonList(Integer.valueOf(num.intValue() * 2)));
                }
            });
        }

        public /* bridge */ /* synthetic */ void asyncInvoke(Object obj, ResultFuture resultFuture) throws Exception {
            asyncInvoke((Integer) obj, (ResultFuture<Integer>) resultFuture);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest$NoOpAsyncFunction.class */
    public static class NoOpAsyncFunction<IN, OUT> implements AsyncFunction<IN, OUT> {
        private static final long serialVersionUID = -3060481953330480694L;

        private NoOpAsyncFunction() {
        }

        public void asyncInvoke(IN in, ResultFuture<OUT> resultFuture) throws Exception {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest$StreamRecordComparator.class */
    public class StreamRecordComparator implements Comparator<Object> {
        private StreamRecordComparator() {
        }

        @Override // java.util.Comparator
        public int compare(Object obj, Object obj2) {
            if ((obj instanceof Watermark) || (obj2 instanceof Watermark)) {
                return 0;
            }
            StreamRecord streamRecord = (StreamRecord) obj;
            StreamRecord streamRecord2 = (StreamRecord) obj2;
            if (streamRecord.getTimestamp() != streamRecord2.getTimestamp()) {
                return (int) (streamRecord.getTimestamp() - streamRecord2.getTimestamp());
            }
            int compareTo = ((Integer) streamRecord.getValue()).compareTo((Integer) streamRecord2.getValue());
            return compareTo != 0 ? compareTo : ((Integer) streamRecord.getValue()).intValue() - ((Integer) streamRecord2.getValue()).intValue();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest$TestAsyncWaitOperator.class */
    private static final class TestAsyncWaitOperator<IN, OUT> extends AsyncWaitOperator<IN, OUT> {
        private static final long serialVersionUID = -8528791694746625560L;
        private final transient OneShotLatch closingLatch;

        public TestAsyncWaitOperator(AsyncFunction<IN, OUT> asyncFunction, long j, int i, AsyncDataStream.OutputMode outputMode, OneShotLatch oneShotLatch) {
            super(asyncFunction, j, i, outputMode);
            this.closingLatch = (OneShotLatch) Preconditions.checkNotNull(oneShotLatch);
        }

        public void close() throws Exception {
            this.closingLatch.trigger();
            this.checkpointingLock.notifyAll();
            super.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest$UserExceptionAsyncFunction.class */
    public static class UserExceptionAsyncFunction implements AsyncFunction<Integer, Integer> {
        private static final long serialVersionUID = 6326568632967110990L;

        private UserExceptionAsyncFunction() {
        }

        public void asyncInvoke(Integer num, ResultFuture<Integer> resultFuture) throws Exception {
            resultFuture.completeExceptionally(new Exception("Test exception"));
        }

        public /* bridge */ /* synthetic */ void asyncInvoke(Object obj, ResultFuture resultFuture) throws Exception {
            asyncInvoke((Integer) obj, (ResultFuture<Integer>) resultFuture);
        }
    }

    @Test
    public void testEventTimeOrdered() throws Exception {
        testEventTime(AsyncDataStream.OutputMode.ORDERED);
    }

    @Test
    public void testWaterMarkUnordered() throws Exception {
        testEventTime(AsyncDataStream.OutputMode.UNORDERED);
    }

    private void testEventTime(AsyncDataStream.OutputMode outputMode) throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator) new AsyncWaitOperator(new MyAsyncFunction(), TIMEOUT, 2, outputMode), (TypeSerializer) IntSerializer.INSTANCE);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.open();
        synchronized (oneInputStreamOperatorTestHarness.getCheckpointLock()) {
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord(1, 1L));
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord(2, 2L));
            oneInputStreamOperatorTestHarness.processWatermark(new Watermark(2L));
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord(3, 3L));
        }
        synchronized (oneInputStreamOperatorTestHarness.getCheckpointLock()) {
            oneInputStreamOperatorTestHarness.close();
        }
        concurrentLinkedQueue.add(new StreamRecord(2, 1L));
        concurrentLinkedQueue.add(new StreamRecord(4, 2L));
        concurrentLinkedQueue.add(new Watermark(2L));
        concurrentLinkedQueue.add(new StreamRecord(6, 3L));
        if (AsyncDataStream.OutputMode.ORDERED == outputMode) {
            TestHarnessUtil.assertOutputEquals("Output with watermark was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput());
            return;
        }
        Object[] array = oneInputStreamOperatorTestHarness.getOutput().toArray();
        Assert.assertEquals("Watermark should be at index 2", new Watermark(2L), array[2]);
        Assert.assertEquals("StreamRecord 3 should be at the end", new StreamRecord(6, 3L), array[3]);
        TestHarnessUtil.assertOutputEqualsSorted("Output for StreamRecords does not match", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput(), new StreamRecordComparator());
    }

    @Test
    public void testProcessingTimeOrdered() throws Exception {
        testProcessingTime(AsyncDataStream.OutputMode.ORDERED);
    }

    @Test
    public void testProcessingUnordered() throws Exception {
        testProcessingTime(AsyncDataStream.OutputMode.UNORDERED);
    }

    private void testProcessingTime(AsyncDataStream.OutputMode outputMode) throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator) new AsyncWaitOperator(new MyAsyncFunction(), TIMEOUT, 6, outputMode), (TypeSerializer) IntSerializer.INSTANCE);
        ArrayDeque arrayDeque = new ArrayDeque();
        oneInputStreamOperatorTestHarness.open();
        synchronized (oneInputStreamOperatorTestHarness.getCheckpointLock()) {
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord(1, 1L));
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord(2, 2L));
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord(3, 3L));
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord(4, 4L));
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord(5, 5L));
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord(6, 6L));
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord(7, 7L));
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord(8, 8L));
        }
        arrayDeque.add(new StreamRecord(2, 1L));
        arrayDeque.add(new StreamRecord(4, 2L));
        arrayDeque.add(new StreamRecord(6, 3L));
        arrayDeque.add(new StreamRecord(8, 4L));
        arrayDeque.add(new StreamRecord(10, 5L));
        arrayDeque.add(new StreamRecord(12, 6L));
        arrayDeque.add(new StreamRecord(14, 7L));
        arrayDeque.add(new StreamRecord(16, 8L));
        synchronized (oneInputStreamOperatorTestHarness.getCheckpointLock()) {
            oneInputStreamOperatorTestHarness.close();
        }
        if (outputMode == AsyncDataStream.OutputMode.ORDERED) {
            TestHarnessUtil.assertOutputEquals("ORDERED Output was not correct.", arrayDeque, oneInputStreamOperatorTestHarness.getOutput());
        } else {
            TestHarnessUtil.assertOutputEqualsSorted("UNORDERED Output was not correct.", arrayDeque, oneInputStreamOperatorTestHarness.getOutput(), new StreamRecordComparator());
        }
    }

    @Test
    public void testOperatorChainWithProcessingTime() throws Exception {
        JobVertex createChainedVertex = createChainedVertex(false);
        OneInputStreamTaskTestHarness oneInputStreamTaskTestHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
        oneInputStreamTaskTestHarness.setupOutputForSingletonOperatorChain();
        oneInputStreamTaskTestHarness.taskConfig = createChainedVertex.getConfiguration();
        oneInputStreamTaskTestHarness.getStreamConfig().setStreamOperator(new StreamConfig(createChainedVertex.getConfiguration()).getStreamOperator(AsyncWaitOperatorTest.class.getClassLoader()));
        oneInputStreamTaskTestHarness.invoke();
        oneInputStreamTaskTestHarness.waitForTaskRunning();
        oneInputStreamTaskTestHarness.processElement(new StreamRecord(5, 0L));
        oneInputStreamTaskTestHarness.processElement(new StreamRecord(6, 0 + 1));
        oneInputStreamTaskTestHarness.processElement(new StreamRecord(7, 0 + 2));
        oneInputStreamTaskTestHarness.processElement(new StreamRecord(8, 0 + 3));
        oneInputStreamTaskTestHarness.processElement(new StreamRecord(9, 0 + 4));
        oneInputStreamTaskTestHarness.endInput();
        oneInputStreamTaskTestHarness.waitForTaskCompletion();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(22, 0L));
        concurrentLinkedQueue.add(new StreamRecord(26, 0 + 1));
        concurrentLinkedQueue.add(new StreamRecord(30, 0 + 2));
        concurrentLinkedQueue.add(new StreamRecord(34, 0 + 3));
        concurrentLinkedQueue.add(new StreamRecord(38, 0 + 4));
        TestHarnessUtil.assertOutputEqualsSorted("Test for chained operator with AsyncWaitOperator failed", concurrentLinkedQueue, oneInputStreamTaskTestHarness.getOutput(), new StreamRecordComparator());
    }

    private JobVertex createChainedVertex(boolean z) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource fromElements = executionEnvironment.fromElements(new Integer[]{1, 2, 3});
        AsyncDataStream.unorderedWait((z ? AsyncDataStream.orderedWait(fromElements, new LazyAsyncFunction(), TIMEOUT, TimeUnit.MILLISECONDS, 6) : AsyncDataStream.orderedWait(fromElements, new MyAsyncFunction(), TIMEOUT, TimeUnit.MILLISECONDS, 6)).map(new RichMapFunction<Integer, Integer>() { // from class: org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest.1
            private static final long serialVersionUID = 1;
            private Integer initialValue = null;

            public void open(Configuration configuration) throws Exception {
                this.initialValue = 1;
            }

            public Integer map(Integer num) throws Exception {
                return Integer.valueOf(this.initialValue.intValue() + num.intValue());
            }
        }), new MyAsyncFunction(), TIMEOUT, TimeUnit.MILLISECONDS, 3).map(new MapFunction<Integer, Integer>() { // from class: org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest.2
            private static final long serialVersionUID = 5162085254238405527L;

            public Integer map(Integer num) throws Exception {
                return num;
            }
        }).startNewChain().addSink(new DiscardingSink());
        JobGraph jobGraph = executionEnvironment.getStreamGraph().getJobGraph();
        Assert.assertTrue(jobGraph.getVerticesSortedTopologicallyFromSources().size() == 3);
        return (JobVertex) jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
    }

    @Test
    public void testStateSnapshotAndRestore() throws Exception {
        OneInputStreamTaskTestHarness oneInputStreamTaskTestHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
        oneInputStreamTaskTestHarness.setupOutputForSingletonOperatorChain();
        AsyncWaitOperator asyncWaitOperator = new AsyncWaitOperator(new LazyAsyncFunction(), TIMEOUT, 3, AsyncDataStream.OutputMode.ORDERED);
        StreamConfig streamConfig = oneInputStreamTaskTestHarness.getStreamConfig();
        OperatorID operatorID = new OperatorID(42L, 4711L);
        streamConfig.setStreamOperator(asyncWaitOperator);
        streamConfig.setOperatorID(operatorID);
        TestTaskStateManager taskStateManager = oneInputStreamTaskTestHarness.getTaskStateManager();
        taskStateManager.setWaitForReportLatch(new OneShotLatch());
        oneInputStreamTaskTestHarness.invoke();
        oneInputStreamTaskTestHarness.waitForTaskRunning();
        OneInputStreamTask mo101getTask = oneInputStreamTaskTestHarness.mo101getTask();
        oneInputStreamTaskTestHarness.processElement(new StreamRecord(1, 1L));
        oneInputStreamTaskTestHarness.processElement(new StreamRecord(2, 2L));
        oneInputStreamTaskTestHarness.processElement(new StreamRecord(3, 3L));
        oneInputStreamTaskTestHarness.processElement(new StreamRecord(4, 4L));
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        mo101getTask.triggerCheckpoint(new CheckpointMetaData(1L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation());
        taskStateManager.getWaitForReportLatch().await();
        Assert.assertEquals(1L, taskStateManager.getReportedCheckpointId());
        LazyAsyncFunction.countDown();
        oneInputStreamTaskTestHarness.endInput();
        oneInputStreamTaskTestHarness.waitForTaskCompletion();
        TaskStateSnapshot lastJobManagerTaskStateSnapshot = taskStateManager.getLastJobManagerTaskStateSnapshot();
        OneInputStreamTaskTestHarness oneInputStreamTaskTestHarness2 = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
        oneInputStreamTaskTestHarness2.setTaskStateSnapshot(1L, lastJobManagerTaskStateSnapshot);
        oneInputStreamTaskTestHarness2.setupOutputForSingletonOperatorChain();
        oneInputStreamTaskTestHarness2.getStreamConfig().setStreamOperator(new AsyncWaitOperator(new MyAsyncFunction(), TIMEOUT, 6, AsyncDataStream.OutputMode.ORDERED));
        oneInputStreamTaskTestHarness2.getStreamConfig().setOperatorID(operatorID);
        oneInputStreamTaskTestHarness2.invoke();
        oneInputStreamTaskTestHarness2.waitForTaskRunning();
        OneInputStreamTask mo101getTask2 = oneInputStreamTaskTestHarness2.mo101getTask();
        oneInputStreamTaskTestHarness2.processElement(new StreamRecord(5, 5L));
        oneInputStreamTaskTestHarness2.processElement(new StreamRecord(6, 6L));
        oneInputStreamTaskTestHarness2.processElement(new StreamRecord(7, 7L));
        mo101getTask2.triggerCheckpoint(new CheckpointMetaData(1L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation());
        oneInputStreamTaskTestHarness2.processElement(new StreamRecord(8, 8L));
        oneInputStreamTaskTestHarness2.endInput();
        oneInputStreamTaskTestHarness2.waitForTaskCompletion();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(2, 1L));
        concurrentLinkedQueue.add(new StreamRecord(4, 2L));
        concurrentLinkedQueue.add(new StreamRecord(6, 3L));
        concurrentLinkedQueue.add(new StreamRecord(8, 4L));
        concurrentLinkedQueue.add(new StreamRecord(10, 5L));
        concurrentLinkedQueue.add(new StreamRecord(12, 6L));
        concurrentLinkedQueue.add(new StreamRecord(14, 7L));
        concurrentLinkedQueue.add(new StreamRecord(16, 8L));
        Iterator<Object> it = oneInputStreamTaskTestHarness2.getOutput().iterator();
        while (it.hasNext()) {
            if (it.next() instanceof CheckpointBarrier) {
                it.remove();
            }
        }
        TestHarnessUtil.assertOutputEquals("StateAndRestored Test Output was not correct.", concurrentLinkedQueue, oneInputStreamTaskTestHarness2.getOutput());
    }

    @Test
    public void testAsyncTimeoutFailure() throws Exception {
        testAsyncTimeout(new LazyAsyncFunction(), Optional.of(TimeoutException.class), new StreamRecord<>(2, 5L));
    }

    @Test
    public void testAsyncTimeoutIgnore() throws Exception {
        testAsyncTimeout(new IgnoreTimeoutLazyAsyncFunction(), Optional.empty(), new StreamRecord<>(3, 0L), new StreamRecord<>(2, 5L));
    }

    private void testAsyncTimeout(LazyAsyncFunction lazyAsyncFunction, Optional<Class<? extends Throwable>> optional, StreamRecord<Integer>... streamRecordArr) throws Exception {
        AsyncWaitOperator asyncWaitOperator = new AsyncWaitOperator(lazyAsyncFunction, 10L, 2, AsyncDataStream.OutputMode.ORDERED);
        MockEnvironment createMockEnvironment = createMockEnvironment();
        createMockEnvironment.setExpectedExternalFailureCause(Throwable.class);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(asyncWaitOperator, IntSerializer.INSTANCE, createMockEnvironment);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.setProcessingTime(0L);
        synchronized (oneInputStreamOperatorTestHarness.getCheckpointLock()) {
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord(1, 0L));
            oneInputStreamOperatorTestHarness.setProcessingTime(5L);
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord(2, 5L));
        }
        oneInputStreamOperatorTestHarness.setProcessingTime(11L);
        LazyAsyncFunction.countDown();
        synchronized (oneInputStreamOperatorTestHarness.getCheckpointLock()) {
            oneInputStreamOperatorTestHarness.close();
        }
        concurrentLinkedQueue.addAll(Arrays.asList(streamRecordArr));
        TestHarnessUtil.assertOutputEquals("Output with watermark was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput());
        if (optional.isPresent()) {
            Assert.assertTrue(createMockEnvironment.getActualExternalFailureCause().isPresent());
            Assert.assertTrue(ExceptionUtils.findThrowable((Throwable) createMockEnvironment.getActualExternalFailureCause().get(), optional.get()).isPresent());
        }
    }

    @Nonnull
    private MockEnvironment createMockEnvironment() {
        return new MockEnvironmentBuilder().setTaskName("foobarTask").setMemorySize(1048576L).setInputSplitProvider(new MockInputSplitProvider()).setBufferSize(4096).build();
    }

    @Test(timeout = 10000)
    public void testClosingWithBlockedEmitter() throws Exception {
        final Object obj = new Object();
        ArgumentCaptor.forClass(Throwable.class);
        MockEnvironment createMockEnvironment = createMockEnvironment();
        StreamTask streamTask = (StreamTask) Mockito.mock(StreamTask.class);
        Mockito.when(streamTask.getEnvironment()).thenReturn(createMockEnvironment);
        Mockito.when(streamTask.getCheckpointLock()).thenReturn(obj);
        Mockito.when(streamTask.getProcessingTimeService()).thenReturn(new TestProcessingTimeService());
        MockStreamConfig mockStreamConfig = new MockStreamConfig();
        mockStreamConfig.setTypeSerializerIn1(IntSerializer.INSTANCE);
        final OneShotLatch oneShotLatch = new OneShotLatch();
        final OneShotLatch oneShotLatch2 = new OneShotLatch();
        Output output = (Output) Mockito.mock(Output.class);
        ((Output) Mockito.doAnswer(new Answer() { // from class: org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest.3
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                Thread.currentThread();
                Assert.assertTrue("Output should happen under the checkpoint lock.", Thread.holdsLock(obj));
                oneShotLatch2.trigger();
                while (!oneShotLatch.isTriggered()) {
                    obj.wait();
                }
                return null;
            }
        }).when(output)).collect(Matchers.any(StreamRecord.class));
        TestAsyncWaitOperator testAsyncWaitOperator = new TestAsyncWaitOperator(new MyAsyncFunction(), TIMEOUT, 1, AsyncDataStream.OutputMode.ORDERED, oneShotLatch);
        testAsyncWaitOperator.setup(streamTask, mockStreamConfig, output);
        testAsyncWaitOperator.open();
        synchronized (obj) {
            testAsyncWaitOperator.processElement(new StreamRecord(42));
        }
        oneShotLatch2.await();
        synchronized (obj) {
            testAsyncWaitOperator.close();
        }
    }

    @Test
    public void testTimeoutCleanup() throws Exception {
        Object obj = new Object();
        MockEnvironment createMockEnvironment = createMockEnvironment();
        ScheduledFuture scheduledFuture = (ScheduledFuture) Mockito.mock(ScheduledFuture.class);
        ProcessingTimeService processingTimeService = (ProcessingTimeService) Mockito.mock(ProcessingTimeService.class);
        Mockito.when(Long.valueOf(processingTimeService.getCurrentProcessingTime())).thenReturn(1L);
        ((ProcessingTimeService) Mockito.doReturn(scheduledFuture).when(processingTimeService)).registerTimer(Matchers.anyLong(), (ProcessingTimeCallback) Matchers.any(ProcessingTimeCallback.class));
        StreamTask streamTask = (StreamTask) Mockito.mock(StreamTask.class);
        Mockito.when(streamTask.getEnvironment()).thenReturn(createMockEnvironment);
        Mockito.when(streamTask.getCheckpointLock()).thenReturn(obj);
        Mockito.when(streamTask.getProcessingTimeService()).thenReturn(processingTimeService);
        MockStreamConfig mockStreamConfig = new MockStreamConfig();
        mockStreamConfig.setTypeSerializerIn1(IntSerializer.INSTANCE);
        Output output = (Output) Mockito.mock(Output.class);
        AsyncWaitOperator asyncWaitOperator = new AsyncWaitOperator(new AsyncFunction<Integer, Integer>() { // from class: org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest.4
            private static final long serialVersionUID = -3718276118074877073L;

            public void asyncInvoke(Integer num, ResultFuture<Integer> resultFuture) throws Exception {
                resultFuture.complete(Collections.singletonList(num));
            }

            public /* bridge */ /* synthetic */ void asyncInvoke(Object obj2, ResultFuture resultFuture) throws Exception {
                asyncInvoke((Integer) obj2, (ResultFuture<Integer>) resultFuture);
            }
        }, 100000L, 1, AsyncDataStream.OutputMode.UNORDERED);
        asyncWaitOperator.setup(streamTask, mockStreamConfig, output);
        asyncWaitOperator.open();
        StreamRecord streamRecord = new StreamRecord(42, 1L);
        synchronized (obj) {
            asyncWaitOperator.processElement(streamRecord);
        }
        synchronized (obj) {
            asyncWaitOperator.close();
        }
        ((Output) Mockito.verify(output)).collect(Matchers.eq(streamRecord));
        ((ProcessingTimeService) Mockito.verify(processingTimeService)).registerTimer(Matchers.eq(processingTimeService.getCurrentProcessingTime() + 100000), (ProcessingTimeCallback) Matchers.any(ProcessingTimeCallback.class));
        ((ScheduledFuture) Mockito.verify(scheduledFuture)).cancel(Matchers.eq(true));
    }

    @Test(timeout = 2000)
    public void testOrderedWaitUserExceptionHandling() throws Exception {
        testUserExceptionHandling(AsyncDataStream.OutputMode.ORDERED);
    }

    @Test(timeout = 2000)
    public void testUnorderedWaitUserExceptionHandling() throws Exception {
        testUserExceptionHandling(AsyncDataStream.OutputMode.UNORDERED);
    }

    private void testUserExceptionHandling(AsyncDataStream.OutputMode outputMode) throws Exception {
        AsyncWaitOperator asyncWaitOperator = new AsyncWaitOperator(new UserExceptionAsyncFunction(), TIMEOUT, 2, outputMode);
        MockEnvironment createMockEnvironment = createMockEnvironment();
        createMockEnvironment.setExpectedExternalFailureCause(Throwable.class);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(asyncWaitOperator, IntSerializer.INSTANCE, createMockEnvironment);
        oneInputStreamOperatorTestHarness.open();
        synchronized (oneInputStreamOperatorTestHarness.getCheckpointLock()) {
            oneInputStreamOperatorTestHarness.processElement(1, 1L);
        }
        synchronized (oneInputStreamOperatorTestHarness.getCheckpointLock()) {
            oneInputStreamOperatorTestHarness.close();
        }
        Assert.assertTrue(oneInputStreamOperatorTestHarness.getEnvironment().getActualExternalFailureCause().isPresent());
    }

    @Test
    public void testOrderedWaitTimeoutHandling() throws Exception {
        testTimeoutExceptionHandling(AsyncDataStream.OutputMode.ORDERED);
    }

    @Test
    public void testUnorderedWaitTimeoutHandling() throws Exception {
        testTimeoutExceptionHandling(AsyncDataStream.OutputMode.UNORDERED);
    }

    private void testTimeoutExceptionHandling(AsyncDataStream.OutputMode outputMode) throws Exception {
        AsyncWaitOperator asyncWaitOperator = new AsyncWaitOperator(new NoOpAsyncFunction(), 10L, 2, outputMode);
        MockEnvironment createMockEnvironment = createMockEnvironment();
        createMockEnvironment.setExpectedExternalFailureCause(Throwable.class);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(asyncWaitOperator, IntSerializer.INSTANCE, createMockEnvironment);
        oneInputStreamOperatorTestHarness.open();
        synchronized (oneInputStreamOperatorTestHarness.getCheckpointLock()) {
            oneInputStreamOperatorTestHarness.processElement(1, 1L);
        }
        oneInputStreamOperatorTestHarness.setProcessingTime(10L);
        synchronized (oneInputStreamOperatorTestHarness.getCheckpointLock()) {
            oneInputStreamOperatorTestHarness.close();
        }
    }

    @Test(timeout = 10000)
    public void testRestartWithFullQueue() throws Exception {
        OperatorSubtaskState snapshot;
        final int i = 10;
        CompletableFuture completableFuture = new CompletableFuture();
        final OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator) new AsyncWaitOperator(new ControllableAsyncFunction(completableFuture), TIMEOUT, 10, AsyncDataStream.OutputMode.ORDERED), (TypeSerializer) IntSerializer.INSTANCE);
        oneInputStreamOperatorTestHarness.open();
        ArrayList arrayList = new ArrayList(10 + 1);
        try {
            synchronized (oneInputStreamOperatorTestHarness.getCheckpointLock()) {
                for (int i2 = 0; i2 < 10; i2++) {
                    oneInputStreamOperatorTestHarness.processElement(Integer.valueOf(i2), 0L);
                    arrayList.add(Integer.valueOf(i2));
                }
            }
            arrayList.add(10);
            final OneShotLatch oneShotLatch = new OneShotLatch();
            new CheckedThread() { // from class: org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest.5
                public void go() throws Exception {
                    synchronized (oneInputStreamOperatorTestHarness.getCheckpointLock()) {
                        oneShotLatch.trigger();
                        oneInputStreamOperatorTestHarness.processElement(Integer.valueOf(i), 0L);
                    }
                }
            }.start();
            oneShotLatch.await();
            synchronized (oneInputStreamOperatorTestHarness.getCheckpointLock()) {
                snapshot = oneInputStreamOperatorTestHarness.snapshot(0L, 0L);
            }
            completableFuture.complete(null);
            synchronized (oneInputStreamOperatorTestHarness.getCheckpointLock()) {
                oneInputStreamOperatorTestHarness.close();
            }
            OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness2 = new OneInputStreamOperatorTestHarness((OneInputStreamOperator) new AsyncWaitOperator(new ControllableAsyncFunction(CompletableFuture.completedFuture(null)), TIMEOUT, 10, AsyncDataStream.OutputMode.ORDERED), (TypeSerializer) IntSerializer.INSTANCE);
            oneInputStreamOperatorTestHarness2.initializeState(snapshot);
            synchronized (oneInputStreamOperatorTestHarness2.getCheckpointLock()) {
                oneInputStreamOperatorTestHarness2.open();
            }
            synchronized (oneInputStreamOperatorTestHarness2.getCheckpointLock()) {
                oneInputStreamOperatorTestHarness2.close();
            }
            ConcurrentLinkedQueue<Object> output = oneInputStreamOperatorTestHarness2.getOutput();
            Assert.assertThat(Integer.valueOf(output.size()), org.hamcrest.Matchers.equalTo(Integer.valueOf(10 + 1)));
            ArrayList arrayList2 = new ArrayList(10 + 1);
            for (int i3 = 0; i3 < 10 + 1; i3++) {
                arrayList2.add(((StreamRecord) output.poll()).getValue());
            }
            Assert.assertThat(arrayList2, org.hamcrest.Matchers.equalTo(arrayList));
        } catch (Throwable th) {
            synchronized (oneInputStreamOperatorTestHarness.getCheckpointLock()) {
                oneInputStreamOperatorTestHarness.close();
                throw th;
            }
        }
    }
}
