/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.coordination;

import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.coordination.EventSender;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.SubtaskAccess;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.SerializedValue;

class SubtaskGatewayImpl
implements OperatorCoordinator.SubtaskGateway {
    private static final String EVENT_LOSS_ERROR_MESSAGE = "An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering task failover to ensure consistency. Event: '%s', targetTask: %s";
    private final SubtaskAccess subtaskAccess;
    private final EventSender sender;
    private final Executor sendingExecutor;

    SubtaskGatewayImpl(SubtaskAccess subtaskAccess, EventSender sender, Executor sendingExecutor) {
        this.subtaskAccess = subtaskAccess;
        this.sender = sender;
        this.sendingExecutor = sendingExecutor;
    }

    @Override
    public CompletableFuture<Acknowledge> sendEvent(OperatorEvent evt) {
        SerializedValue serializedEvent;
        if (!this.isReady()) {
            throw new FlinkRuntimeException("SubtaskGateway is not ready, task not yet running.");
        }
        try {
            serializedEvent = new SerializedValue((Object)evt);
        }
        catch (IOException e) {
            throw new FlinkRuntimeException("Cannot serialize operator event", (Throwable)e);
        }
        Callable<CompletableFuture<Acknowledge>> sendAction = this.subtaskAccess.createEventSendAction((SerializedValue<OperatorEvent>)serializedEvent);
        CompletableFuture<Acknowledge> result = new CompletableFuture<Acknowledge>();
        FutureUtils.assertNoException(result.handleAsync((success, failure) -> {
            if (failure != null && this.subtaskAccess.isStillRunning()) {
                String msg = String.format(EVENT_LOSS_ERROR_MESSAGE, evt, this.subtaskAccess.subtaskName());
                this.subtaskAccess.triggerTaskFailover(new FlinkException(msg, failure));
            }
            return null;
        }, this.sendingExecutor));
        this.sendingExecutor.execute(() -> this.sender.sendEvent(sendAction, result));
        return result;
    }

    @Override
    public ExecutionAttemptID getExecution() {
        return this.subtaskAccess.currentAttempt();
    }

    @Override
    public int getSubtask() {
        return this.subtaskAccess.getSubtaskIndex();
    }

    private boolean isReady() {
        return this.subtaskAccess.hasSwitchedToRunning().isDone();
    }
}

