/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskmanager;

import akka.dispatch.OnFailure;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.PartialFunction;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

public class ActorGatewayResultPartitionConsumableNotifier
implements ResultPartitionConsumableNotifier {
    private static final Logger LOG = LoggerFactory.getLogger(ActorGatewayResultPartitionConsumableNotifier.class);
    private final ExecutionContext executionContext;
    private final ActorGateway jobManager;
    private final FiniteDuration jobManagerMessageTimeout;

    public ActorGatewayResultPartitionConsumableNotifier(ExecutionContext executionContext, ActorGateway jobManager, FiniteDuration jobManagerMessageTimeout) {
        this.executionContext = (ExecutionContext)Preconditions.checkNotNull((Object)executionContext);
        this.jobManager = (ActorGateway)Preconditions.checkNotNull((Object)jobManager);
        this.jobManagerMessageTimeout = (FiniteDuration)Preconditions.checkNotNull((Object)jobManagerMessageTimeout);
    }

    @Override
    public void notifyPartitionConsumable(JobID jobId, ResultPartitionID partitionId, final TaskActions taskActions) {
        JobManagerMessages.ScheduleOrUpdateConsumers msg = new JobManagerMessages.ScheduleOrUpdateConsumers(jobId, partitionId);
        Future<Object> futureResponse = this.jobManager.ask(msg, this.jobManagerMessageTimeout);
        futureResponse.onFailure((PartialFunction)new OnFailure(){

            public void onFailure(Throwable failure) {
                LOG.error("Could not schedule or update consumers at the JobManager.", failure);
                taskActions.failExternally(new RuntimeException("Could not notify JobManager to schedule or update consumers", failure));
            }
        }, this.executionContext);
    }
}

