/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network;

import akka.actor.ActorRef;
import akka.dispatch.OnFailure;
import akka.pattern.Patterns;
import akka.util.Timeout;
import java.io.IOException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.LocalConnectionManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.PartialFunction;
import scala.Tuple2;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

public class NetworkEnvironment {
    private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class);
    private final Object lock = new Object();
    private final NetworkEnvironmentConfiguration configuration;
    private final FiniteDuration jobManagerTimeout;
    private final NetworkBufferPool networkBufferPool;
    private ConnectionManager connectionManager;
    private ResultPartitionManager partitionManager;
    private TaskEventDispatcher taskEventDispatcher;
    private ResultPartitionConsumableNotifier partitionConsumableNotifier;
    private PartitionStateChecker partitionStateChecker;
    private boolean isShutdown;

    public NetworkEnvironment(FiniteDuration jobManagerTimeout, NetworkEnvironmentConfiguration config) throws IOException {
        this.configuration = Preconditions.checkNotNull(config);
        this.jobManagerTimeout = Preconditions.checkNotNull(jobManagerTimeout);
        try {
            this.networkBufferPool = new NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize());
        }
        catch (Throwable t) {
            throw new IOException("Cannot allocate network buffer pool: " + t.getMessage(), t);
        }
    }

    public ResultPartitionManager getPartitionManager() {
        return this.partitionManager;
    }

    public TaskEventDispatcher getTaskEventDispatcher() {
        return this.taskEventDispatcher;
    }

    public ConnectionManager getConnectionManager() {
        return this.connectionManager;
    }

    public NetworkBufferPool getNetworkBufferPool() {
        return this.networkBufferPool;
    }

    public IOManager.IOMode getDefaultIOMode() {
        return this.configuration.ioMode();
    }

    public ResultPartitionConsumableNotifier getPartitionConsumableNotifier() {
        return this.partitionConsumableNotifier;
    }

    public PartitionStateChecker getPartitionStateChecker() {
        return this.partitionStateChecker;
    }

    public Tuple2<Integer, Integer> getPartitionRequestInitialAndMaxBackoff() {
        return this.configuration.partitionRequestInitialAndMaxBackoff();
    }

    public boolean isAssociated() {
        return this.partitionConsumableNotifier != null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void associateWithTaskManagerAndJobManager(ActorRef jobManagerRef, ActorRef taskManagerRef) throws IOException {
        Preconditions.checkNotNull(jobManagerRef);
        Preconditions.checkNotNull(taskManagerRef);
        Object object = this.lock;
        synchronized (object) {
            if (this.isShutdown) {
                throw new IllegalStateException("environment is shut down");
            }
            if (this.partitionConsumableNotifier == null && this.partitionManager == null && this.taskEventDispatcher == null && this.connectionManager == null) {
                LOG.debug("Starting result partition manager and network connection manager");
                this.partitionManager = new ResultPartitionManager();
                this.taskEventDispatcher = new TaskEventDispatcher();
                this.partitionConsumableNotifier = new JobManagerResultPartitionConsumableNotifier(jobManagerRef, taskManagerRef, new Timeout(this.jobManagerTimeout));
                this.partitionStateChecker = new JobManagerPartitionStateChecker(jobManagerRef, taskManagerRef);
                Option<NettyConfig> nettyConfig = this.configuration.nettyConfig();
                this.connectionManager = nettyConfig.isDefined() ? new NettyConnectionManager((NettyConfig)nettyConfig.get()) : new LocalConnectionManager();
                try {
                    LOG.debug("Starting network connection manager");
                    this.connectionManager.start(this.partitionManager, this.taskEventDispatcher, this.networkBufferPool);
                }
                catch (Throwable t) {
                    throw new IOException("Failed to instantiate network connection manager: " + t.getMessage(), t);
                }
            } else {
                throw new IllegalStateException("Network Environment is already associated with a JobManager/TaskManager");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void disassociate() throws IOException {
        Object object = this.lock;
        synchronized (object) {
            if (!this.isAssociated()) {
                return;
            }
            LOG.debug("Disassociating NetworkEnvironment from TaskManager. Cleaning all intermediate results.");
            if (this.connectionManager != null) {
                try {
                    LOG.debug("Shutting down network connection manager");
                    this.connectionManager.shutdown();
                    this.connectionManager = null;
                }
                catch (Throwable t) {
                    throw new IOException("Cannot shutdown network connection manager", t);
                }
            }
            if (this.partitionManager != null) {
                try {
                    LOG.debug("Shutting down intermediate result partition manager");
                    this.partitionManager.shutdown();
                    this.partitionManager = null;
                }
                catch (Throwable t) {
                    throw new IOException("Cannot shutdown partition manager", t);
                }
            }
            this.partitionConsumableNotifier = null;
            this.partitionStateChecker = null;
            if (this.taskEventDispatcher != null) {
                this.taskEventDispatcher.clearAll();
                this.taskEventDispatcher = null;
            }
            this.networkBufferPool.destroyAllBufferPools();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerTask(Task task) throws IOException {
        ResultPartition[] producedPartitions = task.getProducedPartitions();
        ResultPartitionWriter[] writers = task.getAllWriters();
        if (writers.length != producedPartitions.length) {
            throw new IllegalStateException("Unequal number of writers and partitions.");
        }
        Object object = this.lock;
        synchronized (object) {
            SingleInputGate[] inputGates;
            if (this.isShutdown) {
                throw new IllegalStateException("NetworkEnvironment is shut down");
            }
            if (!this.isAssociated()) {
                throw new IllegalStateException("NetworkEnvironment is not associated with a TaskManager");
            }
            for (int i = 0; i < producedPartitions.length; ++i) {
                ResultPartition partition = producedPartitions[i];
                ResultPartitionWriter writer = writers[i];
                BufferPool bufferPool = null;
                try {
                    bufferPool = this.networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions(), false);
                    partition.registerBufferPool(bufferPool);
                    this.partitionManager.registerResultPartition(partition);
                }
                catch (Throwable t) {
                    if (bufferPool != null) {
                        bufferPool.lazyDestroy();
                    }
                    if (t instanceof IOException) {
                        throw (IOException)t;
                    }
                    throw new IOException(t.getMessage(), t);
                }
                this.taskEventDispatcher.registerWriterForIncomingTaskEvents(writer.getPartitionId(), writer);
            }
            for (SingleInputGate gate : inputGates = task.getAllInputGates()) {
                BufferPool bufferPool = null;
                try {
                    bufferPool = this.networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(), false);
                    gate.setBufferPool(bufferPool);
                }
                catch (Throwable t) {
                    if (bufferPool != null) {
                        bufferPool.lazyDestroy();
                    }
                    if (t instanceof IOException) {
                        throw (IOException)t;
                    }
                    throw new IOException(t.getMessage(), t);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unregisterTask(Task task) {
        LOG.debug("Unregister task {} from network environment (state: {}).", (Object)task.getTaskNameWithSubtasks(), (Object)task.getExecutionState());
        ExecutionAttemptID executionId = task.getExecutionId();
        Object object = this.lock;
        synchronized (object) {
            SingleInputGate[] inputGates;
            ResultPartition[] partitions;
            ResultPartitionWriter[] writers;
            if (this.isShutdown || !this.isAssociated()) {
                return;
            }
            if (task.isCanceledOrFailed()) {
                this.partitionManager.releasePartitionsProducedBy(executionId, task.getFailureCause());
            }
            if ((writers = task.getAllWriters()) != null) {
                for (ResultPartitionWriter writer : writers) {
                    this.taskEventDispatcher.unregisterWriter(writer);
                }
            }
            if ((partitions = task.getProducedPartitions()) != null) {
                for (ResultPartition partition : partitions) {
                    partition.destroyBufferPool();
                }
            }
            if ((inputGates = task.getAllInputGates()) != null) {
                for (SingleInputGate gate : inputGates) {
                    try {
                        if (gate == null) continue;
                        gate.releaseAllResources();
                    }
                    catch (IOException e) {
                        LOG.error("Error during release of reader resources: " + e.getMessage(), (Throwable)e);
                    }
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        Object object = this.lock;
        synchronized (object) {
            if (this.isShutdown) {
                return;
            }
            try {
                this.disassociate();
            }
            catch (Throwable t) {
                LOG.warn("Network services did not shut down properly: " + t.getMessage(), t);
            }
            try {
                this.networkBufferPool.destroy();
            }
            catch (Throwable t) {
                LOG.warn("Network buffer pool did not shut down properly: " + t.getMessage(), t);
            }
            this.isShutdown = true;
        }
    }

    public boolean isShutdown() {
        return this.isShutdown;
    }

    private static class JobManagerPartitionStateChecker
    implements PartitionStateChecker {
        private final ActorRef jobManager;
        private final ActorRef taskManager;

        public JobManagerPartitionStateChecker(ActorRef jobManager, ActorRef taskManager) {
            this.jobManager = jobManager;
            this.taskManager = taskManager;
        }

        @Override
        public void triggerPartitionStateCheck(JobID jobId, ExecutionAttemptID executionAttemptID, IntermediateDataSetID resultId, ResultPartitionID partitionId) {
            JobManagerMessages.RequestPartitionState msg = new JobManagerMessages.RequestPartitionState(jobId, partitionId, executionAttemptID, resultId);
            this.jobManager.tell((Object)msg, this.taskManager);
        }
    }

    private static class JobManagerResultPartitionConsumableNotifier
    implements ResultPartitionConsumableNotifier {
        private final ActorRef jobManager;
        private final ActorRef taskManager;
        private final Timeout jobManagerMessageTimeout;

        public JobManagerResultPartitionConsumableNotifier(ActorRef jobManager, ActorRef taskManager, Timeout jobManagerMessageTimeout) {
            this.jobManager = jobManager;
            this.taskManager = taskManager;
            this.jobManagerMessageTimeout = jobManagerMessageTimeout;
        }

        @Override
        public void notifyPartitionConsumable(JobID jobId, final ResultPartitionID partitionId) {
            JobManagerMessages.ScheduleOrUpdateConsumers msg = new JobManagerMessages.ScheduleOrUpdateConsumers(jobId, partitionId);
            Future futureResponse = Patterns.ask((ActorRef)this.jobManager, (Object)msg, (Timeout)this.jobManagerMessageTimeout);
            futureResponse.onFailure((PartialFunction)new OnFailure(){

                public void onFailure(Throwable failure) {
                    LOG.error("Could not schedule or update consumers at the JobManager.", failure);
                    TaskMessages.FailTask failMsg = new TaskMessages.FailTask(partitionId.getProducerId(), new RuntimeException("Could not notify JobManager to schedule or update consumers", failure));
                    JobManagerResultPartitionConsumableNotifier.this.taskManager.tell((Object)failMsg, ActorRef.noSender());
                }
            }, AkkaUtils.globalExecutionContext());
        }
    }
}

