/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.execution;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.Buffer;
import org.apache.flink.runtime.io.network.bufferprovider.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
import org.apache.flink.runtime.io.network.bufferprovider.LocalBufferPool;
import org.apache.flink.runtime.io.network.bufferprovider.LocalBufferPoolOwner;
import org.apache.flink.runtime.io.network.channels.ChannelID;
import org.apache.flink.runtime.io.network.channels.OutputChannel;
import org.apache.flink.runtime.io.network.gates.GateID;
import org.apache.flink.runtime.io.network.gates.InputGate;
import org.apache.flink.runtime.io.network.gates.OutputGate;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.protocols.AccumulatorProtocol;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RuntimeEnvironment
implements Environment,
BufferProvider,
LocalBufferPoolOwner,
Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(RuntimeEnvironment.class);
    private static final ThreadGroup TASK_THREADS = new ThreadGroup("Task Threads");
    private static final int SLEEPINTERVAL = 100;
    private final Task owner;
    private final Configuration jobConfiguration;
    private final Configuration taskConfiguration;
    private final ClassLoader userCodeClassLoader;
    private final Class<? extends AbstractInvokable> invokableClass;
    private final AbstractInvokable invokable;
    private final ArrayList<OutputGate> outputGates = new ArrayList();
    private final ArrayList<InputGate<? extends IOReadableWritable>> inputGates = new ArrayList();
    private final Queue<GateID> unboundInputGateIDs = new ArrayDeque<GateID>();
    private final MemoryManager memoryManager;
    private final IOManager ioManager;
    private final InputSplitProvider inputSplitProvider;
    private Thread executingThread;
    private final AccumulatorProtocol accumulatorProtocolProxy;
    private final Map<String, FutureTask<Path>> cacheCopyTasks = new HashMap<String, FutureTask<Path>>();
    private final BroadcastVariableManager bcVarManager;
    private LocalBufferPool outputBufferPool;
    private AtomicBoolean canceled = new AtomicBoolean();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public RuntimeEnvironment(Task owner, TaskDeploymentDescriptor tdd, ClassLoader userCodeClassLoader, MemoryManager memoryManager, IOManager ioManager, InputSplitProvider inputSplitProvider, AccumulatorProtocol accumulatorProtocolProxy, BroadcastVariableManager bcVarManager) throws Exception {
        int i;
        Preconditions.checkNotNull((Object)owner);
        Preconditions.checkNotNull((Object)memoryManager);
        Preconditions.checkNotNull((Object)ioManager);
        Preconditions.checkNotNull((Object)inputSplitProvider);
        Preconditions.checkNotNull((Object)accumulatorProtocolProxy);
        Preconditions.checkNotNull((Object)userCodeClassLoader);
        Preconditions.checkNotNull((Object)bcVarManager);
        this.owner = owner;
        this.memoryManager = memoryManager;
        this.ioManager = ioManager;
        this.inputSplitProvider = inputSplitProvider;
        this.accumulatorProtocolProxy = accumulatorProtocolProxy;
        this.bcVarManager = bcVarManager;
        this.userCodeClassLoader = userCodeClassLoader;
        try {
            String className = tdd.getInvokableClassName();
            this.invokableClass = Class.forName(className, true, userCodeClassLoader).asSubclass(AbstractInvokable.class);
        }
        catch (Throwable t) {
            throw new Exception("Could not load invokable class.", t);
        }
        try {
            this.invokable = this.invokableClass.newInstance();
        }
        catch (Throwable t) {
            throw new Exception("Could not instantiate the invokable class.", t);
        }
        this.jobConfiguration = tdd.getJobConfiguration();
        this.taskConfiguration = tdd.getTaskConfiguration();
        this.invokable.setEnvironment(this);
        Thread currentThread = Thread.currentThread();
        ClassLoader context = currentThread.getContextClassLoader();
        currentThread.setContextClassLoader(userCodeClassLoader);
        try {
            this.invokable.registerInputOutput();
        }
        finally {
            currentThread.setContextClassLoader(context);
        }
        List<GateDeploymentDescriptor> inGates = tdd.getInputGates();
        List<GateDeploymentDescriptor> outGates = tdd.getOutputGates();
        if (this.inputGates.size() != inGates.size()) {
            throw new Exception("The number of readers created in 'registerInputOutput()' is different than the number of connected incoming edges in the job graph.");
        }
        if (this.outputGates.size() != outGates.size()) {
            throw new Exception("The number of writers created in 'registerInputOutput()' is different than the number of connected outgoing edges in the job graph.");
        }
        for (i = 0; i < inGates.size(); ++i) {
            this.inputGates.get(i).initializeChannels(inGates.get(i));
        }
        for (i = 0; i < outGates.size(); ++i) {
            this.outputGates.get(i).initializeChannels(outGates.get(i));
        }
    }

    public AbstractInvokable getInvokable() {
        return this.invokable;
    }

    @Override
    public JobID getJobID() {
        return this.owner.getJobID();
    }

    @Override
    public JobVertexID getJobVertexId() {
        return this.owner.getVertexID();
    }

    @Override
    public GateID getNextUnboundInputGateID() {
        return this.unboundInputGateIDs.poll();
    }

    @Override
    public OutputGate createAndRegisterOutputGate() {
        OutputGate gate = new OutputGate(this.getJobID(), new GateID(), this.getNumberOfOutputGates());
        this.outputGates.add(gate);
        return gate;
    }

    @Override
    public void run() {
        if (this.owner.isCanceledOrFailed()) {
            this.owner.cancelingDone();
            return;
        }
        try {
            Thread.currentThread().setContextClassLoader(this.userCodeClassLoader);
            this.invokable.invoke();
            if (this.owner.isCanceledOrFailed()) {
                throw new CancelTaskException();
            }
            this.closeInputGates();
            this.requestAllOutputGatesToClose();
            this.waitForInputChannelsToBeClosed();
            this.waitForOutputChannelsToBeClosed();
            if (this.owner.isCanceledOrFailed()) {
                throw new CancelTaskException();
            }
            if (!this.owner.markAsFinished()) {
                throw new Exception("Could notify job manager that the task is finished.");
            }
        }
        catch (Throwable t) {
            if (!this.owner.isCanceledOrFailed()) {
                try {
                    this.invokable.cancel();
                }
                catch (Throwable t2) {
                    LOG.error("Error while canceling the task", t2);
                }
            }
            this.releaseAllChannelResources();
            if (this.owner.isCanceledOrFailed() || t instanceof CancelTaskException) {
                this.owner.cancelingDone();
            } else {
                this.owner.markFailed(t);
            }
        }
        finally {
            this.releaseAllChannelResources();
        }
    }

    @Override
    public <T extends IOReadableWritable> InputGate<T> createAndRegisterInputGate() {
        InputGate gate = new InputGate(this.getJobID(), new GateID(), this.getNumberOfInputGates());
        this.inputGates.add(gate);
        return gate;
    }

    @Override
    public int getNumberOfOutputGates() {
        return this.outputGates.size();
    }

    @Override
    public int getNumberOfInputGates() {
        return this.inputGates.size();
    }

    @Override
    public int getNumberOfOutputChannels() {
        int numberOfOutputChannels = 0;
        for (int i = 0; i < this.outputGates.size(); ++i) {
            numberOfOutputChannels += this.outputGates.get(i).getNumChannels();
        }
        return numberOfOutputChannels;
    }

    @Override
    public int getNumberOfInputChannels() {
        int numberOfInputChannels = 0;
        for (int i = 0; i < this.inputGates.size(); ++i) {
            numberOfInputChannels += this.inputGates.get(i).getNumberOfInputChannels();
        }
        return numberOfInputChannels;
    }

    public InputGate<? extends IOReadableWritable> getInputGate(int pos) {
        if (pos < this.inputGates.size()) {
            return this.inputGates.get(pos);
        }
        return null;
    }

    public OutputGate getOutputGate(int index) {
        if (index < this.outputGates.size()) {
            return this.outputGates.get(index);
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Thread getExecutingThread() {
        RuntimeEnvironment runtimeEnvironment = this;
        synchronized (runtimeEnvironment) {
            if (this.executingThread == null) {
                String name = this.owner.getTaskNameWithSubtasks();
                this.executingThread = new Thread(TASK_THREADS, this, name);
            }
            return this.executingThread;
        }
    }

    public void cancelExecution() {
        Thread executingThread;
        if (!this.canceled.compareAndSet(false, true)) {
            return;
        }
        LOG.info("Canceling " + this.owner.getTaskNameWithSubtasks());
        if (this.invokable != null) {
            try {
                this.invokable.cancel();
            }
            catch (Throwable e) {
                LOG.error("Error while cancelling the task.", e);
            }
        }
        if ((executingThread = this.executingThread) != null) {
            executingThread.interrupt();
            try {
                executingThread.join(5000L);
            }
            catch (InterruptedException e) {
                // empty catch block
            }
            if (!executingThread.isAlive()) {
                return;
            }
            while (executingThread != null && executingThread.isAlive()) {
                LOG.warn("Task " + this.owner.getTaskNameWithSubtasks() + " did not react to cancelling signal. Sending repeated interrupt.");
                if (LOG.isDebugEnabled()) {
                    StackTraceElement[] stack;
                    StringBuilder bld = new StringBuilder("Task ").append(this.owner.getTaskNameWithSubtasks()).append(" is stuck in method:\n");
                    for (StackTraceElement e : stack = executingThread.getStackTrace()) {
                        bld.append(e).append('\n');
                    }
                    LOG.debug(bld.toString());
                }
                executingThread.interrupt();
                try {
                    executingThread.join(1000L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }
    }

    private void waitForOutputChannelsToBeClosed() throws InterruptedException {
        if (this.owner.isCanceledOrFailed()) {
            return;
        }
        for (OutputGate og : this.outputGates) {
            og.waitForGateToBeClosed();
        }
    }

    private void waitForInputChannelsToBeClosed() throws IOException, InterruptedException {
        while (!this.canceled.get()) {
            if (this.owner.isCanceledOrFailed()) {
                throw new InterruptedException();
            }
            boolean allClosed = true;
            for (int i = 0; i < this.getNumberOfInputGates(); ++i) {
                InputGate<? extends IOReadableWritable> eig = this.inputGates.get(i);
                if (eig.isClosed()) continue;
                allClosed = false;
            }
            if (allClosed) break;
            Thread.sleep(100L);
        }
    }

    private void closeInputGates() throws IOException, InterruptedException {
        for (int i = 0; i < this.inputGates.size(); ++i) {
            InputGate<? extends IOReadableWritable> eig = this.inputGates.get(i);
            eig.close();
        }
    }

    private void requestAllOutputGatesToClose() throws IOException, InterruptedException {
        for (int i = 0; i < this.outputGates.size(); ++i) {
            this.outputGates.get(i).requestClose();
        }
    }

    @Override
    public IOManager getIOManager() {
        return this.ioManager;
    }

    @Override
    public MemoryManager getMemoryManager() {
        return this.memoryManager;
    }

    @Override
    public BroadcastVariableManager getBroadcastVariableManager() {
        return this.bcVarManager;
    }

    @Override
    public Configuration getTaskConfiguration() {
        return this.taskConfiguration;
    }

    @Override
    public Configuration getJobConfiguration() {
        return this.jobConfiguration;
    }

    @Override
    public int getCurrentNumberOfSubtasks() {
        return this.owner.getNumberOfSubtasks();
    }

    @Override
    public int getIndexInSubtaskGroup() {
        return this.owner.getSubtaskIndex();
    }

    @Override
    public String getTaskName() {
        return this.owner.getTaskName();
    }

    @Override
    public InputSplitProvider getInputSplitProvider() {
        return this.inputSplitProvider;
    }

    private void releaseAllChannelResources() {
        int i;
        for (i = 0; i < this.inputGates.size(); ++i) {
            this.inputGates.get(i).releaseAllChannelResources();
        }
        for (i = 0; i < this.outputGates.size(); ++i) {
            this.outputGates.get(i).releaseAllChannelResources();
        }
    }

    @Override
    public Set<ChannelID> getOutputChannelIDs() {
        HashSet<ChannelID> ids = new HashSet<ChannelID>();
        for (OutputGate gate : this.outputGates) {
            for (OutputChannel channel : gate.channels()) {
                ids.add(channel.getID());
            }
        }
        return Collections.unmodifiableSet(ids);
    }

    @Override
    public Set<ChannelID> getInputChannelIDs() {
        HashSet<ChannelID> inputChannelIDs = new HashSet<ChannelID>();
        for (InputGate<? extends IOReadableWritable> outputGate : this.inputGates) {
            for (int i = 0; i < outputGate.getNumberOfInputChannels(); ++i) {
                inputChannelIDs.add(outputGate.getInputChannel(i).getID());
            }
        }
        return Collections.unmodifiableSet(inputChannelIDs);
    }

    @Override
    public Set<GateID> getInputGateIDs() {
        HashSet<GateID> inputGateIDs = new HashSet<GateID>();
        Iterator<InputGate<? extends IOReadableWritable>> gateIterator = this.inputGates.iterator();
        while (gateIterator.hasNext()) {
            inputGateIDs.add(gateIterator.next().getGateID());
        }
        return Collections.unmodifiableSet(inputGateIDs);
    }

    @Override
    public Set<GateID> getOutputGateIDs() {
        HashSet<GateID> outputGateIDs = new HashSet<GateID>();
        Iterator<OutputGate> gateIterator = this.outputGates.iterator();
        while (gateIterator.hasNext()) {
            outputGateIDs.add(gateIterator.next().getGateID());
        }
        return Collections.unmodifiableSet(outputGateIDs);
    }

    @Override
    public Set<ChannelID> getOutputChannelIDsOfGate(GateID gateID) {
        OutputGate outputGate = null;
        for (OutputGate candidateGate : this.outputGates) {
            if (!candidateGate.getGateID().equals(gateID)) continue;
            outputGate = candidateGate;
            break;
        }
        if (outputGate == null) {
            throw new IllegalArgumentException("Cannot find output gate with ID " + gateID);
        }
        HashSet<ChannelID> outputChannelIDs = new HashSet<ChannelID>();
        for (int i = 0; i < outputGate.getNumChannels(); ++i) {
            outputChannelIDs.add(outputGate.getChannel(i).getID());
        }
        return Collections.unmodifiableSet(outputChannelIDs);
    }

    @Override
    public Set<ChannelID> getInputChannelIDsOfGate(GateID gateID) {
        InputGate<? extends IOReadableWritable> inputGate = null;
        for (InputGate<? extends IOReadableWritable> candidateGate : this.inputGates) {
            if (!candidateGate.getGateID().equals(gateID)) continue;
            inputGate = candidateGate;
            break;
        }
        if (inputGate == null) {
            throw new IllegalArgumentException("Cannot find input gate with ID " + gateID);
        }
        HashSet<ChannelID> inputChannelIDs = new HashSet<ChannelID>();
        for (int i = 0; i < inputGate.getNumberOfInputChannels(); ++i) {
            inputChannelIDs.add(inputGate.getInputChannel(i).getID());
        }
        return Collections.unmodifiableSet(inputChannelIDs);
    }

    public List<OutputGate> outputGates() {
        return this.outputGates;
    }

    public List<InputGate<? extends IOReadableWritable>> inputGates() {
        return this.inputGates;
    }

    @Override
    public AccumulatorProtocol getAccumulatorProtocolProxy() {
        return this.accumulatorProtocolProxy;
    }

    @Override
    public ClassLoader getUserClassLoader() {
        return this.userCodeClassLoader;
    }

    public void addCopyTasksForCacheFile(Map<String, FutureTask<Path>> copyTasks) {
        this.cacheCopyTasks.putAll(copyTasks);
    }

    public void addCopyTaskForCacheFile(String name, FutureTask<Path> copyTask) {
        this.cacheCopyTasks.put(name, copyTask);
    }

    @Override
    public Map<String, FutureTask<Path>> getCopyTask() {
        return this.cacheCopyTasks;
    }

    @Override
    public BufferProvider getOutputBufferProvider() {
        return this;
    }

    @Override
    public Buffer requestBuffer(int minBufferSize) throws IOException {
        return this.outputBufferPool.requestBuffer(minBufferSize);
    }

    @Override
    public Buffer requestBufferBlocking(int minBufferSize) throws IOException, InterruptedException {
        return this.outputBufferPool.requestBufferBlocking(minBufferSize);
    }

    @Override
    public int getBufferSize() {
        return this.outputBufferPool.getBufferSize();
    }

    @Override
    public BufferProvider.BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
        return this.outputBufferPool.registerBufferAvailabilityListener(listener);
    }

    @Override
    public int getNumberOfChannels() {
        return this.getNumberOfOutputChannels();
    }

    @Override
    public void setDesignatedNumberOfBuffers(int numBuffers) {
        this.outputBufferPool.setNumDesignatedBuffers(numBuffers);
    }

    @Override
    public void clearLocalBufferPool() {
        this.outputBufferPool.destroy();
    }

    @Override
    public void registerGlobalBufferPool(GlobalBufferPool globalBufferPool) {
        if (this.outputBufferPool == null) {
            this.outputBufferPool = new LocalBufferPool(globalBufferPool, 1);
        }
    }

    @Override
    public void logBufferUtilization() {
        LOG.info(String.format("\t%s: %d available, %d requested, %d designated", this.owner.getTaskNameWithSubtasks(), this.outputBufferPool.numAvailableBuffers(), this.outputBufferPool.numRequestedBuffers(), this.outputBufferPool.numDesignatedBuffers()));
    }

    @Override
    public void reportAsynchronousEvent() {
        this.outputBufferPool.reportAsynchronousEvent();
    }
}

