package org.apache.flink.runtime.execution;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.Buffer;
import org.apache.flink.runtime.io.network.bufferprovider.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
import org.apache.flink.runtime.io.network.bufferprovider.LocalBufferPool;
import org.apache.flink.runtime.io.network.bufferprovider.LocalBufferPoolOwner;
import org.apache.flink.runtime.io.network.channels.ChannelID;
import org.apache.flink.runtime.io.network.channels.OutputChannel;
import org.apache.flink.runtime.io.network.gates.GateID;
import org.apache.flink.runtime.io.network.gates.InputGate;
import org.apache.flink.runtime.io.network.gates.OutputGate;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.protocols.AccumulatorProtocol;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/execution/RuntimeEnvironment.class */
public class RuntimeEnvironment implements Environment, BufferProvider, LocalBufferPoolOwner, Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(RuntimeEnvironment.class);
    private static final ThreadGroup TASK_THREADS = new ThreadGroup("Task Threads");
    private static final int SLEEPINTERVAL = 100;
    private final Task owner;
    private final Configuration jobConfiguration;
    private final Configuration taskConfiguration;
    private final ClassLoader userCodeClassLoader;
    private final Class<? extends AbstractInvokable> invokableClass;
    private final AbstractInvokable invokable;
    private final MemoryManager memoryManager;
    private final IOManager ioManager;
    private final InputSplitProvider inputSplitProvider;
    private Thread executingThread;
    private final AccumulatorProtocol accumulatorProtocolProxy;
    private LocalBufferPool outputBufferPool;
    private final ArrayList<OutputGate> outputGates = new ArrayList<>();
    private final ArrayList<InputGate<? extends IOReadableWritable>> inputGates = new ArrayList<>();
    private final Queue<GateID> unboundInputGateIDs = new ArrayDeque();
    private final Map<String, FutureTask<Path>> cacheCopyTasks = new HashMap();
    private AtomicBoolean canceled = new AtomicBoolean();

    public RuntimeEnvironment(Task task, TaskDeploymentDescriptor taskDeploymentDescriptor, ClassLoader classLoader, MemoryManager memoryManager, IOManager iOManager, InputSplitProvider inputSplitProvider, AccumulatorProtocol accumulatorProtocol) throws Exception {
        Preconditions.checkNotNull(task);
        Preconditions.checkNotNull(memoryManager);
        Preconditions.checkNotNull(iOManager);
        Preconditions.checkNotNull(inputSplitProvider);
        Preconditions.checkNotNull(accumulatorProtocol);
        Preconditions.checkNotNull(classLoader);
        this.owner = task;
        this.memoryManager = memoryManager;
        this.ioManager = iOManager;
        this.inputSplitProvider = inputSplitProvider;
        this.accumulatorProtocolProxy = accumulatorProtocol;
        this.userCodeClassLoader = classLoader;
        try {
            this.invokableClass = Class.forName(taskDeploymentDescriptor.getInvokableClassName(), true, classLoader).asSubclass(AbstractInvokable.class);
            try {
                this.invokable = this.invokableClass.newInstance();
                this.jobConfiguration = taskDeploymentDescriptor.getJobConfiguration();
                this.taskConfiguration = taskDeploymentDescriptor.getTaskConfiguration();
                this.invokable.setEnvironment(this);
                this.invokable.registerInputOutput();
                List<GateDeploymentDescriptor> inputGates = taskDeploymentDescriptor.getInputGates();
                List<GateDeploymentDescriptor> outputGates = taskDeploymentDescriptor.getOutputGates();
                if (this.inputGates.size() != inputGates.size()) {
                    throw new Exception("The number of readers created in 'registerInputOutput()' is different than the number of connected incoming edges in the job graph.");
                }
                if (this.outputGates.size() != outputGates.size()) {
                    throw new Exception("The number of writers created in 'registerInputOutput()' is different than the number of connected outgoing edges in the job graph.");
                }
                for (int i = 0; i < inputGates.size(); i++) {
                    this.inputGates.get(i).initializeChannels(inputGates.get(i));
                }
                for (int i2 = 0; i2 < outputGates.size(); i2++) {
                    this.outputGates.get(i2).initializeChannels(outputGates.get(i2));
                }
            } catch (Throwable th) {
                throw new Exception("Could not instantiate the invokable class.", th);
            }
        } catch (Throwable th2) {
            throw new Exception("Could not load invokable class.", th2);
        }
    }

    public AbstractInvokable getInvokable() {
        return this.invokable;
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public JobID getJobID() {
        return this.owner.getJobID();
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public GateID getNextUnboundInputGateID() {
        return this.unboundInputGateIDs.poll();
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public OutputGate createAndRegisterOutputGate() {
        OutputGate outputGate = new OutputGate(getJobID(), new GateID(), getNumberOfOutputGates());
        this.outputGates.add(outputGate);
        return outputGate;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            if (this.owner.isCanceledOrFailed()) {
                this.owner.cancelingDone();
                return;
            }
            try {
                Thread.currentThread().setContextClassLoader(this.userCodeClassLoader);
                this.invokable.invoke();
                if (this.owner.isCanceledOrFailed()) {
                    throw new CancelTaskException();
                }
                closeInputGates();
                requestAllOutputGatesToClose();
                waitForInputChannelsToBeClosed();
                waitForOutputChannelsToBeClosed();
                if (this.owner.isCanceledOrFailed()) {
                    throw new CancelTaskException();
                }
                if (!this.owner.markAsFinished()) {
                    throw new Exception("Could notify job manager that the task is finished.");
                }
                releaseAllChannelResources();
            } catch (Throwable th) {
                if (!this.owner.isCanceledOrFailed()) {
                    try {
                        this.invokable.cancel();
                    } catch (Throwable th2) {
                        LOG.error("Error while canceling the task", th2);
                    }
                }
                releaseAllChannelResources();
                if (this.owner.isCanceledOrFailed() || (th instanceof CancelTaskException)) {
                    this.owner.cancelingDone();
                } else {
                    this.owner.markFailed(th);
                }
                releaseAllChannelResources();
            }
        } catch (Throwable th3) {
            releaseAllChannelResources();
            throw th3;
        }
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public <T extends IOReadableWritable> InputGate<T> createAndRegisterInputGate() {
        InputGate<T> inputGate = new InputGate<>(getJobID(), new GateID(), getNumberOfInputGates());
        this.inputGates.add(inputGate);
        return inputGate;
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public int getNumberOfOutputGates() {
        return this.outputGates.size();
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public int getNumberOfInputGates() {
        return this.inputGates.size();
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public int getNumberOfOutputChannels() {
        int i = 0;
        for (int i2 = 0; i2 < this.outputGates.size(); i2++) {
            i += this.outputGates.get(i2).getNumChannels();
        }
        return i;
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public int getNumberOfInputChannels() {
        int i = 0;
        for (int i2 = 0; i2 < this.inputGates.size(); i2++) {
            i += this.inputGates.get(i2).getNumberOfInputChannels();
        }
        return i;
    }

    public InputGate<? extends IOReadableWritable> getInputGate(int i) {
        if (i < this.inputGates.size()) {
            return this.inputGates.get(i);
        }
        return null;
    }

    public OutputGate getOutputGate(int i) {
        if (i < this.outputGates.size()) {
            return this.outputGates.get(i);
        }
        return null;
    }

    public Thread getExecutingThread() {
        Thread thread;
        synchronized (this) {
            if (this.executingThread == null) {
                this.executingThread = new Thread(TASK_THREADS, this, this.owner.getTaskNameWithSubtasks());
            }
            thread = this.executingThread;
        }
        return thread;
    }

    public void cancelExecution() {
        if (this.canceled.compareAndSet(false, true)) {
            LOG.info("Canceling " + this.owner.getTaskNameWithSubtasks());
            if (this.invokable != null) {
                try {
                    this.invokable.cancel();
                } catch (Throwable th) {
                    LOG.error("Error while cancelling the task.", th);
                }
            }
            Thread thread = this.executingThread;
            if (thread != null) {
                thread.interrupt();
                try {
                    thread.join(5000L);
                } catch (InterruptedException e) {
                }
                if (thread.isAlive()) {
                    while (thread != null && thread.isAlive()) {
                        LOG.warn("Task " + this.owner.getTaskNameWithSubtasks() + " did not react to cancelling signal. Sending repeated interrupt.");
                        if (LOG.isDebugEnabled()) {
                            StringBuilder append = new StringBuilder("Task ").append(this.owner.getTaskNameWithSubtasks()).append(" is stuck in method:\n");
                            for (StackTraceElement stackTraceElement : thread.getStackTrace()) {
                                append.append(stackTraceElement).append('\n');
                            }
                            LOG.debug(append.toString());
                        }
                        thread.interrupt();
                        try {
                            thread.join(1000L);
                        } catch (InterruptedException e2) {
                        }
                    }
                }
            }
        }
    }

    private void waitForOutputChannelsToBeClosed() throws InterruptedException {
        if (this.owner.isCanceledOrFailed()) {
            return;
        }
        Iterator<OutputGate> it = this.outputGates.iterator();
        while (it.hasNext()) {
            it.next().waitForGateToBeClosed();
        }
    }

    private void waitForInputChannelsToBeClosed() throws IOException, InterruptedException {
        while (!this.canceled.get()) {
            if (this.owner.isCanceledOrFailed()) {
                throw new InterruptedException();
            }
            boolean z = true;
            for (int i = 0; i < getNumberOfInputGates(); i++) {
                if (!this.inputGates.get(i).isClosed()) {
                    z = false;
                }
            }
            if (z) {
                return;
            } else {
                Thread.sleep(100L);
            }
        }
    }

    private void closeInputGates() throws IOException, InterruptedException {
        for (int i = 0; i < this.inputGates.size(); i++) {
            this.inputGates.get(i).close();
        }
    }

    private void requestAllOutputGatesToClose() throws IOException, InterruptedException {
        for (int i = 0; i < this.outputGates.size(); i++) {
            this.outputGates.get(i).requestClose();
        }
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public IOManager getIOManager() {
        return this.ioManager;
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public MemoryManager getMemoryManager() {
        return this.memoryManager;
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public Configuration getTaskConfiguration() {
        return this.taskConfiguration;
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public Configuration getJobConfiguration() {
        return this.jobConfiguration;
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public int getCurrentNumberOfSubtasks() {
        return this.owner.getNumberOfSubtasks();
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public int getIndexInSubtaskGroup() {
        return this.owner.getSubtaskIndex();
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public String getTaskName() {
        return this.owner.getTaskName();
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public InputSplitProvider getInputSplitProvider() {
        return this.inputSplitProvider;
    }

    private void releaseAllChannelResources() {
        for (int i = 0; i < this.inputGates.size(); i++) {
            this.inputGates.get(i).releaseAllChannelResources();
        }
        for (int i2 = 0; i2 < this.outputGates.size(); i2++) {
            this.outputGates.get(i2).releaseAllChannelResources();
        }
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public Set<ChannelID> getOutputChannelIDs() {
        HashSet hashSet = new HashSet();
        Iterator<OutputGate> it = this.outputGates.iterator();
        while (it.hasNext()) {
            for (OutputChannel outputChannel : it.next().channels()) {
                hashSet.add(outputChannel.getID());
            }
        }
        return Collections.unmodifiableSet(hashSet);
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public Set<ChannelID> getInputChannelIDs() {
        HashSet hashSet = new HashSet();
        Iterator<InputGate<? extends IOReadableWritable>> it = this.inputGates.iterator();
        while (it.hasNext()) {
            InputGate<? extends IOReadableWritable> next = it.next();
            for (int i = 0; i < next.getNumberOfInputChannels(); i++) {
                hashSet.add(next.getInputChannel(i).getID());
            }
        }
        return Collections.unmodifiableSet(hashSet);
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public Set<GateID> getInputGateIDs() {
        HashSet hashSet = new HashSet();
        Iterator<InputGate<? extends IOReadableWritable>> it = this.inputGates.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getGateID());
        }
        return Collections.unmodifiableSet(hashSet);
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public Set<GateID> getOutputGateIDs() {
        HashSet hashSet = new HashSet();
        Iterator<OutputGate> it = this.outputGates.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getGateID());
        }
        return Collections.unmodifiableSet(hashSet);
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public Set<ChannelID> getOutputChannelIDsOfGate(GateID gateID) {
        OutputGate outputGate = null;
        Iterator<OutputGate> it = this.outputGates.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            OutputGate next = it.next();
            if (next.getGateID().equals(gateID)) {
                outputGate = next;
                break;
            }
        }
        if (outputGate == null) {
            throw new IllegalArgumentException("Cannot find output gate with ID " + gateID);
        }
        HashSet hashSet = new HashSet();
        for (int i = 0; i < outputGate.getNumChannels(); i++) {
            hashSet.add(outputGate.getChannel(i).getID());
        }
        return Collections.unmodifiableSet(hashSet);
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public Set<ChannelID> getInputChannelIDsOfGate(GateID gateID) {
        InputGate<? extends IOReadableWritable> inputGate = null;
        Iterator<InputGate<? extends IOReadableWritable>> it = this.inputGates.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            InputGate<? extends IOReadableWritable> next = it.next();
            if (next.getGateID().equals(gateID)) {
                inputGate = next;
                break;
            }
        }
        if (inputGate == null) {
            throw new IllegalArgumentException("Cannot find input gate with ID " + gateID);
        }
        HashSet hashSet = new HashSet();
        for (int i = 0; i < inputGate.getNumberOfInputChannels(); i++) {
            hashSet.add(inputGate.getInputChannel(i).getID());
        }
        return Collections.unmodifiableSet(hashSet);
    }

    public List<OutputGate> outputGates() {
        return this.outputGates;
    }

    public List<InputGate<? extends IOReadableWritable>> inputGates() {
        return this.inputGates;
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public AccumulatorProtocol getAccumulatorProtocolProxy() {
        return this.accumulatorProtocolProxy;
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public ClassLoader getUserClassLoader() {
        return this.userCodeClassLoader;
    }

    public void addCopyTasksForCacheFile(Map<String, FutureTask<Path>> map) {
        this.cacheCopyTasks.putAll(map);
    }

    public void addCopyTaskForCacheFile(String str, FutureTask<Path> futureTask) {
        this.cacheCopyTasks.put(str, futureTask);
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public Map<String, FutureTask<Path>> getCopyTask() {
        return this.cacheCopyTasks;
    }

    @Override // org.apache.flink.runtime.execution.Environment
    public BufferProvider getOutputBufferProvider() {
        return this;
    }

    @Override // org.apache.flink.runtime.io.network.bufferprovider.BufferProvider
    public Buffer requestBuffer(int i) throws IOException {
        return this.outputBufferPool.requestBuffer(i);
    }

    @Override // org.apache.flink.runtime.io.network.bufferprovider.BufferProvider
    public Buffer requestBufferBlocking(int i) throws IOException, InterruptedException {
        return this.outputBufferPool.requestBufferBlocking(i);
    }

    @Override // org.apache.flink.runtime.io.network.bufferprovider.BufferProvider
    public int getBufferSize() {
        return this.outputBufferPool.getBufferSize();
    }

    @Override // org.apache.flink.runtime.io.network.bufferprovider.BufferProvider
    public BufferProvider.BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener bufferAvailabilityListener) {
        return this.outputBufferPool.registerBufferAvailabilityListener(bufferAvailabilityListener);
    }

    @Override // org.apache.flink.runtime.io.network.bufferprovider.LocalBufferPoolOwner
    public int getNumberOfChannels() {
        return getNumberOfOutputChannels();
    }

    @Override // org.apache.flink.runtime.io.network.bufferprovider.LocalBufferPoolOwner
    public void setDesignatedNumberOfBuffers(int i) {
        this.outputBufferPool.setNumDesignatedBuffers(i);
    }

    @Override // org.apache.flink.runtime.io.network.bufferprovider.LocalBufferPoolOwner
    public void clearLocalBufferPool() {
        this.outputBufferPool.destroy();
    }

    @Override // org.apache.flink.runtime.io.network.bufferprovider.LocalBufferPoolOwner
    public void registerGlobalBufferPool(GlobalBufferPool globalBufferPool) {
        if (this.outputBufferPool == null) {
            this.outputBufferPool = new LocalBufferPool(globalBufferPool, 1);
        }
    }

    @Override // org.apache.flink.runtime.io.network.bufferprovider.LocalBufferPoolOwner
    public void logBufferUtilization() {
        LOG.info(String.format("\t%s: %d available, %d requested, %d designated", this.owner.getTaskNameWithSubtasks(), Integer.valueOf(this.outputBufferPool.numAvailableBuffers()), Integer.valueOf(this.outputBufferPool.numRequestedBuffers()), Integer.valueOf(this.outputBufferPool.numDesignatedBuffers())));
    }

    @Override // org.apache.flink.runtime.io.network.bufferprovider.BufferProvider, org.apache.flink.runtime.io.network.bufferprovider.LocalBufferPoolOwner
    public void reportAsynchronousEvent() {
        this.outputBufferPool.reportAsynchronousEvent();
    }
}
