/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.stats;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.StateForTask;
import org.apache.flink.runtime.checkpoint.stats.CheckpointStats;
import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.stats.JobCheckpointStats;
import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class SimpleCheckpointStatsTrackerTest {
    private static final Random RAND = new Random();

    @Test
    public void testNoCompletedCheckpointYet() throws Exception {
        SimpleCheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(0, new ExecutionVertex[0]);
        Assert.assertFalse((boolean)tracker.getJobStats().isDefined());
        Assert.assertFalse((boolean)tracker.getOperatorStats(new JobVertexID()).isDefined());
    }

    @Test
    public void testRandomStats() throws Exception {
        CompletedCheckpoint[] checkpoints = SimpleCheckpointStatsTrackerTest.generateRandomCheckpoints(16);
        ExecutionVertex[] tasksToWaitFor = this.createTasksToWaitFor(checkpoints[0]);
        SimpleCheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor);
        for (int i = 0; i < checkpoints.length; ++i) {
            CompletedCheckpoint checkpoint = checkpoints[i];
            tracker.onCompletedCheckpoint(checkpoint);
            SimpleCheckpointStatsTrackerTest.verifyJobStats((CheckpointStatsTracker)tracker, 10, Arrays.copyOfRange(checkpoints, 0, i + 1));
            SimpleCheckpointStatsTrackerTest.verifySubtaskStats((CheckpointStatsTracker)tracker, tasksToWaitFor, checkpoint);
        }
    }

    @Test
    public void testIllegalOperatorId() throws Exception {
        CompletedCheckpoint[] checkpoints = SimpleCheckpointStatsTrackerTest.generateRandomCheckpoints(16);
        ExecutionVertex[] tasksToWaitFor = this.createTasksToWaitFor(checkpoints[0]);
        SimpleCheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor);
        for (CompletedCheckpoint checkpoint : checkpoints) {
            tracker.onCompletedCheckpoint(checkpoint);
        }
        Assert.assertTrue((boolean)tracker.getJobStats().isDefined());
        Assert.assertTrue((boolean)tracker.getOperatorStats(new JobVertexID()).isEmpty());
    }

    @Test
    public void testCompletedCheckpointReordering() throws Exception {
        CompletedCheckpoint[] checkpoints = SimpleCheckpointStatsTrackerTest.generateRandomCheckpoints(2);
        ExecutionVertex[] tasksToWaitFor = this.createTasksToWaitFor(checkpoints[0]);
        SimpleCheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor);
        tracker.onCompletedCheckpoint(checkpoints[1]);
        SimpleCheckpointStatsTrackerTest.verifyJobStats((CheckpointStatsTracker)tracker, 10, new CompletedCheckpoint[]{checkpoints[1]});
        SimpleCheckpointStatsTrackerTest.verifySubtaskStats((CheckpointStatsTracker)tracker, tasksToWaitFor, checkpoints[1]);
        tracker.onCompletedCheckpoint(checkpoints[0]);
        SimpleCheckpointStatsTrackerTest.verifyJobStats((CheckpointStatsTracker)tracker, 10, checkpoints);
        SimpleCheckpointStatsTrackerTest.verifySubtaskStats((CheckpointStatsTracker)tracker, tasksToWaitFor, checkpoints[1]);
    }

    @Test
    public void testOperatorStateCachedClearedOnNewCheckpoint() throws Exception {
        CompletedCheckpoint[] checkpoints = SimpleCheckpointStatsTrackerTest.generateRandomCheckpoints(2);
        ExecutionVertex[] tasksToWaitFor = this.createTasksToWaitFor(checkpoints[0]);
        SimpleCheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor);
        tracker.onCompletedCheckpoint(checkpoints[0]);
        JobVertexID operatorId = ((StateForTask)checkpoints[0].getStates().get(0)).getOperatorId();
        Assert.assertNotNull((Object)tracker.getOperatorStats(((StateForTask)checkpoints[0].getStates().get(0)).getOperatorId()));
        Field f = tracker.getClass().getDeclaredField("operatorStatsCache");
        f.setAccessible(true);
        Map cache = (Map)f.get(tracker);
        Assert.assertTrue((boolean)cache.containsKey(operatorId));
        tracker.onCompletedCheckpoint(checkpoints[1]);
        Assert.assertTrue((boolean)cache.isEmpty());
    }

    private static void verifyJobStats(CheckpointStatsTracker tracker, int historySize, CompletedCheckpoint[] checkpoints) {
        Assert.assertTrue((boolean)tracker.getJobStats().isDefined());
        JobCheckpointStats jobStats = (JobCheckpointStats)tracker.getJobStats().get();
        List history = jobStats.getRecentHistory();
        if (historySize > checkpoints.length) {
            Assert.assertEquals((long)checkpoints.length, (long)history.size());
        } else {
            Assert.assertEquals((long)historySize, (long)history.size());
        }
        Assert.assertTrue((checkpoints.length >= history.size() ? 1 : 0) != 0);
        for (int i = 0; i < history.size(); ++i) {
            CheckpointStats actualStats = (CheckpointStats)history.get(history.size() - i - 1);
            CompletedCheckpoint checkpoint = checkpoints[checkpoints.length - 1 - i];
            long stateSize = 0L;
            for (StateForTask state : checkpoint.getStates()) {
                stateSize += state.getStateSize();
            }
            CheckpointStats expectedStats = new CheckpointStats(checkpoint.getCheckpointID(), checkpoint.getTimestamp(), checkpoint.getDuration(), stateSize);
            Assert.assertEquals((Object)expectedStats, (Object)actualStats);
        }
        long minDuration = Long.MAX_VALUE;
        long maxDuration = Long.MIN_VALUE;
        long totalDuration = 0L;
        long minStateSize = Long.MAX_VALUE;
        long maxStateSize = Long.MIN_VALUE;
        long totalStateSize = 0L;
        long count = 0L;
        for (CompletedCheckpoint checkpoint : checkpoints) {
            ++count;
            if (checkpoint.getDuration() < minDuration) {
                minDuration = checkpoint.getDuration();
            }
            if (checkpoint.getDuration() > maxDuration) {
                maxDuration = checkpoint.getDuration();
            }
            totalDuration += checkpoint.getDuration();
            long stateSize = 0L;
            for (StateForTask state : checkpoint.getStates()) {
                stateSize += state.getStateSize();
            }
            if (stateSize < minStateSize) {
                minStateSize = stateSize;
            }
            if (stateSize > maxStateSize) {
                maxStateSize = stateSize;
            }
            totalStateSize += stateSize;
        }
        Assert.assertEquals((long)count, (long)jobStats.getCount());
        Assert.assertEquals((long)minDuration, (long)jobStats.getMinDuration());
        Assert.assertEquals((long)maxDuration, (long)jobStats.getMaxDuration());
        Assert.assertEquals((long)(totalDuration / count), (long)jobStats.getAverageDuration());
        Assert.assertEquals((long)minStateSize, (long)jobStats.getMinStateSize());
        Assert.assertEquals((long)maxStateSize, (long)jobStats.getMaxStateSize());
        Assert.assertEquals((long)(totalStateSize / count), (long)jobStats.getAverageStateSize());
    }

    private static void verifySubtaskStats(CheckpointStatsTracker tracker, ExecutionVertex[] tasksToWaitFor, CompletedCheckpoint checkpoint) {
        for (ExecutionVertex vertex : tasksToWaitFor) {
            JobVertexID operatorId = vertex.getJobvertexId();
            int parallelism = vertex.getTotalNumberOfParallelSubtasks();
            OperatorCheckpointStats actualStats = (OperatorCheckpointStats)tracker.getOperatorStats(operatorId).get();
            long operatorDuration = Long.MIN_VALUE;
            long operatorStateSize = 0L;
            long[][] expectedSubTaskStats = new long[parallelism][2];
            for (int i = 0; i < parallelism; ++i) {
                long duration = -1L;
                long stateSize = -1L;
                for (StateForTask state : checkpoint.getStates()) {
                    if (!state.getOperatorId().equals((Object)operatorId) || state.getSubtask() != i) continue;
                    duration = state.getDuration();
                    stateSize = state.getStateSize();
                }
                expectedSubTaskStats[i][0] = duration;
                expectedSubTaskStats[i][1] = stateSize;
            }
            OperatorCheckpointStats expectedStats = new OperatorCheckpointStats(checkpoint.getCheckpointID(), checkpoint.getTimestamp(), operatorDuration, operatorStateSize, expectedSubTaskStats);
            Assert.assertEquals((Object)expectedStats, (Object)actualStats);
        }
    }

    private static CompletedCheckpoint[] generateRandomCheckpoints(int numCheckpoints) throws IOException {
        int i;
        JobID jobId = new JobID();
        int minNumOperators = 4;
        int maxNumOperators = 32;
        int minParallelism = 4;
        int maxParallelism = 16;
        long minStateSize = Integer.MAX_VALUE;
        long maxStateSize = Long.MAX_VALUE;
        CompletedCheckpoint[] checkpoints = new CompletedCheckpoint[numCheckpoints];
        int numOperators = RAND.nextInt(maxNumOperators - minNumOperators + 1) + minNumOperators;
        JobVertexID[] operatorIds = new JobVertexID[numOperators];
        int[] operatorParallelism = new int[numOperators];
        for (i = 0; i < numOperators; ++i) {
            operatorIds[i] = new JobVertexID();
            operatorParallelism[i] = RAND.nextInt(maxParallelism - minParallelism + 1) + minParallelism;
        }
        for (i = 0; i < numCheckpoints; ++i) {
            long triggerTimestamp = System.currentTimeMillis();
            int maxDuration = RAND.nextInt(129);
            ArrayList<StateForTask> states = new ArrayList<StateForTask>();
            int completionDuration = 0;
            for (int operatorIndex = 0; operatorIndex < numOperators; ++operatorIndex) {
                JobVertexID operatorId = operatorIds[operatorIndex];
                int parallelism = operatorParallelism[operatorIndex];
                for (int subtaskIndex = 0; subtaskIndex < parallelism; ++subtaskIndex) {
                    int duration = RAND.nextInt(maxDuration + 1);
                    if (duration > completionDuration) {
                        completionDuration = duration;
                    }
                    states.add(new StateForTask(new SerializedValue(null), minStateSize + (long)(RAND.nextDouble() * (double)(maxStateSize - minStateSize)), operatorId, subtaskIndex, (long)duration));
                }
            }
            long completionTimestamp = triggerTimestamp + (long)completionDuration + (long)RAND.nextInt(10);
            checkpoints[i] = new CompletedCheckpoint(jobId, (long)i, triggerTimestamp, completionTimestamp, states);
        }
        return checkpoints;
    }

    private ExecutionVertex[] createTasksToWaitFor(CompletedCheckpoint checkpoint) {
        HashMap<JobVertexID, Integer> operators = new HashMap<JobVertexID, Integer>();
        for (StateForTask state : checkpoint.getStates()) {
            Integer parallelism = (Integer)operators.get(state.getOperatorId());
            if (parallelism == null) {
                operators.put(state.getOperatorId(), state.getSubtask() + 1);
                continue;
            }
            if (parallelism >= state.getSubtask() + 1) continue;
            operators.put(state.getOperatorId(), state.getSubtask() + 1);
        }
        ExecutionVertex[] tasksToWaitFor = new ExecutionVertex[operators.size()];
        int i = 0;
        for (JobVertexID operatorId : operators.keySet()) {
            tasksToWaitFor[i] = (ExecutionVertex)Mockito.mock(ExecutionVertex.class);
            Mockito.when((Object)tasksToWaitFor[i].getJobvertexId()).thenReturn((Object)operatorId);
            Mockito.when((Object)tasksToWaitFor[i].getTotalNumberOfParallelSubtasks()).thenReturn(operators.get(operatorId));
            ++i;
        }
        return tasksToWaitFor;
    }
}

