/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.legacy.backpressure;

import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSample;
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StackTraceSampleCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(StackTraceSampleCoordinator.class);
    private static final int NUM_GHOST_SAMPLE_IDS = 10;
    private final Object lock = new Object();
    private final Executor executor;
    private final long sampleTimeout;
    private final Map<Integer, PendingStackTraceSample> pendingSamples = new HashMap<Integer, PendingStackTraceSample>();
    private final ArrayDeque<Integer> recentPendingSamples = new ArrayDeque(10);
    private int sampleIdCounter;
    private boolean isShutDown;

    public StackTraceSampleCoordinator(Executor executor, long sampleTimeout) {
        Preconditions.checkArgument((sampleTimeout >= 0L ? 1 : 0) != 0);
        this.executor = (Executor)Preconditions.checkNotNull((Object)executor);
        this.sampleTimeout = sampleTimeout;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<StackTraceSample> triggerStackTraceSample(ExecutionVertex[] tasksToSample, int numSamples, Time delayBetweenSamples, int maxStackTraceDepth) {
        Preconditions.checkNotNull((Object)tasksToSample, (String)"Tasks to sample");
        Preconditions.checkArgument((tasksToSample.length >= 1 ? 1 : 0) != 0, (Object)"No tasks to sample");
        Preconditions.checkArgument((numSamples >= 1 ? 1 : 0) != 0, (Object)"No number of samples");
        Preconditions.checkArgument((maxStackTraceDepth >= 0 ? 1 : 0) != 0, (Object)"Negative maximum stack trace depth");
        ExecutionAttemptID[] triggerIds = new ExecutionAttemptID[tasksToSample.length];
        Execution[] executions = new Execution[tasksToSample.length];
        for (int i = 0; i < triggerIds.length; ++i) {
            Execution execution = tasksToSample[i].getCurrentExecutionAttempt();
            if (execution == null || execution.getState() != ExecutionState.RUNNING) {
                return FutureUtils.completedExceptionally(new IllegalStateException("Task " + tasksToSample[i].getTaskNameWithSubtaskIndex() + " is not running."));
            }
            executions[i] = execution;
            triggerIds[i] = execution.getAttemptId();
        }
        Object object = this.lock;
        synchronized (object) {
            if (this.isShutDown) {
                return FutureUtils.completedExceptionally(new IllegalStateException("Shut down"));
            }
            int sampleId = this.sampleIdCounter++;
            LOG.debug("Triggering stack trace sample {}", (Object)sampleId);
            PendingStackTraceSample pending = new PendingStackTraceSample(sampleId, triggerIds);
            long expectedDuration = (long)numSamples * delayBetweenSamples.toMilliseconds();
            Time timeout = Time.milliseconds((long)(expectedDuration + this.sampleTimeout));
            this.pendingSamples.put(sampleId, pending);
            for (Execution execution : executions) {
                CompletableFuture<StackTraceSampleResponse> stackTraceSampleFuture = execution.requestStackTraceSample(sampleId, numSamples, delayBetweenSamples, maxStackTraceDepth, timeout);
                stackTraceSampleFuture.handleAsync((stackTraceSampleResponse, throwable) -> {
                    if (stackTraceSampleResponse != null) {
                        this.collectStackTraces(stackTraceSampleResponse.getSampleId(), stackTraceSampleResponse.getExecutionAttemptID(), stackTraceSampleResponse.getSamples());
                    } else {
                        this.cancelStackTraceSample(sampleId, (Throwable)throwable);
                    }
                    return null;
                }, this.executor);
            }
            return pending.getStackTraceSampleFuture();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancelStackTraceSample(int sampleId, Throwable cause) {
        Object object = this.lock;
        synchronized (object) {
            if (this.isShutDown) {
                return;
            }
            PendingStackTraceSample sample = this.pendingSamples.remove(sampleId);
            if (sample != null) {
                if (cause != null) {
                    LOG.info("Cancelling sample " + sampleId, cause);
                } else {
                    LOG.info("Cancelling sample {}", (Object)sampleId);
                }
                sample.discard(cause);
                this.rememberRecentSampleId(sampleId);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutDown() {
        Object object = this.lock;
        synchronized (object) {
            if (!this.isShutDown) {
                LOG.info("Shutting down stack trace sample coordinator.");
                for (PendingStackTraceSample pending : this.pendingSamples.values()) {
                    pending.discard(new RuntimeException("Shut down"));
                }
                this.pendingSamples.clear();
                this.isShutDown = true;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void collectStackTraces(int sampleId, ExecutionAttemptID executionId, List<StackTraceElement[]> stackTraces) {
        Object object = this.lock;
        synchronized (object) {
            PendingStackTraceSample pending;
            if (this.isShutDown) {
                return;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Collecting stack trace sample {} of task {}", (Object)sampleId, (Object)executionId);
            }
            if ((pending = this.pendingSamples.get(sampleId)) != null) {
                pending.collectStackTraces(executionId, stackTraces);
                if (pending.isComplete()) {
                    this.pendingSamples.remove(sampleId);
                    this.rememberRecentSampleId(sampleId);
                    pending.completePromiseAndDiscard();
                }
            } else if (this.recentPendingSamples.contains(sampleId)) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Received late stack trace sample {} of task {}", (Object)sampleId, (Object)executionId);
                }
            } else if (LOG.isDebugEnabled()) {
                LOG.debug("Unknown sample ID " + sampleId);
            }
        }
    }

    private void rememberRecentSampleId(int sampleId) {
        if (this.recentPendingSamples.size() >= 10) {
            this.recentPendingSamples.removeFirst();
        }
        this.recentPendingSamples.addLast(sampleId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    int getNumberOfPendingSamples() {
        Object object = this.lock;
        synchronized (object) {
            return this.pendingSamples.size();
        }
    }

    private static class PendingStackTraceSample {
        private final int sampleId;
        private final long startTime;
        private final Set<ExecutionAttemptID> pendingTasks;
        private final Map<ExecutionAttemptID, List<StackTraceElement[]>> stackTracesByTask;
        private final CompletableFuture<StackTraceSample> stackTraceFuture;
        private boolean isDiscarded;

        PendingStackTraceSample(int sampleId, ExecutionAttemptID[] tasksToCollect) {
            this.sampleId = sampleId;
            this.startTime = System.currentTimeMillis();
            this.pendingTasks = new HashSet<ExecutionAttemptID>(Arrays.asList(tasksToCollect));
            this.stackTracesByTask = Maps.newHashMapWithExpectedSize((int)tasksToCollect.length);
            this.stackTraceFuture = new CompletableFuture();
        }

        int getSampleId() {
            return this.sampleId;
        }

        long getStartTime() {
            return this.startTime;
        }

        boolean isDiscarded() {
            return this.isDiscarded;
        }

        boolean isComplete() {
            if (this.isDiscarded) {
                throw new IllegalStateException("Discarded");
            }
            return this.pendingTasks.isEmpty();
        }

        void discard(Throwable cause) {
            if (!this.isDiscarded) {
                this.pendingTasks.clear();
                this.stackTracesByTask.clear();
                this.stackTraceFuture.completeExceptionally(new RuntimeException("Discarded", cause));
                this.isDiscarded = true;
            }
        }

        void collectStackTraces(ExecutionAttemptID executionId, List<StackTraceElement[]> stackTraces) {
            if (this.isDiscarded) {
                throw new IllegalStateException("Discarded");
            }
            if (!this.pendingTasks.remove((Object)executionId)) {
                if (this.isComplete()) {
                    throw new IllegalStateException("Completed");
                }
                throw new IllegalArgumentException("Unknown task " + (Object)((Object)executionId));
            }
            this.stackTracesByTask.put(executionId, Collections.unmodifiableList(stackTraces));
        }

        void completePromiseAndDiscard() {
            if (!this.isComplete()) {
                throw new IllegalStateException("Not completed yet");
            }
            this.isDiscarded = true;
            long endTime = System.currentTimeMillis();
            StackTraceSample stackTraceSample = new StackTraceSample(this.sampleId, this.startTime, endTime, this.stackTracesByTask);
            this.stackTraceFuture.complete(stackTraceSample);
        }

        CompletableFuture<StackTraceSample> getStackTraceSampleFuture() {
            return this.stackTraceFuture;
        }
    }
}

