/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.coordination;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventValve;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TemporaryClassLoaderContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OperatorCoordinatorHolder
implements OperatorCoordinator,
OperatorCoordinatorCheckpointContext {
    private final OperatorCoordinator coordinator;
    private final OperatorID operatorId;
    private final LazyInitializedCoordinatorContext context;
    private final OperatorEventValve eventValve;
    private final int operatorParallelism;
    private final int operatorMaxParallelism;
    private Consumer<Throwable> globalFailureHandler;
    private ComponentMainThreadExecutor mainThreadExecutor;

    private OperatorCoordinatorHolder(OperatorID operatorId, OperatorCoordinator coordinator, LazyInitializedCoordinatorContext context, OperatorEventValve eventValve, int operatorParallelism, int operatorMaxParallelism) {
        this.operatorId = (OperatorID)((Object)Preconditions.checkNotNull((Object)((Object)operatorId)));
        this.coordinator = (OperatorCoordinator)Preconditions.checkNotNull((Object)coordinator);
        this.context = (LazyInitializedCoordinatorContext)Preconditions.checkNotNull((Object)context);
        this.eventValve = (OperatorEventValve)Preconditions.checkNotNull((Object)eventValve);
        this.operatorParallelism = operatorParallelism;
        this.operatorMaxParallelism = operatorMaxParallelism;
    }

    public void lazyInitialize(SchedulerNG scheduler, ComponentMainThreadExecutor mainThreadExecutor) {
        this.lazyInitialize(scheduler::handleGlobalFailure, mainThreadExecutor);
    }

    @VisibleForTesting
    void lazyInitialize(Consumer<Throwable> globalFailureHandler, ComponentMainThreadExecutor mainThreadExecutor) {
        this.globalFailureHandler = globalFailureHandler;
        this.mainThreadExecutor = mainThreadExecutor;
        this.context.lazyInitialize(globalFailureHandler, mainThreadExecutor);
    }

    public OperatorCoordinator coordinator() {
        return this.coordinator;
    }

    @Override
    public OperatorID operatorId() {
        return this.operatorId;
    }

    @Override
    public int maxParallelism() {
        return this.operatorMaxParallelism;
    }

    @Override
    public int currentParallelism() {
        return this.operatorParallelism;
    }

    @Override
    public void start() throws Exception {
        this.mainThreadExecutor.assertRunningInMainThread();
        Preconditions.checkState((boolean)this.context.isInitialized(), (Object)"Coordinator Context is not yet initialized");
        this.coordinator.start();
    }

    @Override
    public void close() throws Exception {
        this.coordinator.close();
        this.context.unInitialize();
    }

    @Override
    public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception {
        this.mainThreadExecutor.assertRunningInMainThread();
        this.coordinator.handleEventFromOperator(subtask, event);
    }

    @Override
    public void subtaskFailed(int subtask, @Nullable Throwable reason) {
        this.mainThreadExecutor.assertRunningInMainThread();
        this.coordinator.subtaskFailed(subtask, reason);
        this.eventValve.resetForTask(subtask);
    }

    @Override
    public void subtaskReset(int subtask, long checkpointId) {
        this.mainThreadExecutor.assertRunningInMainThread();
        this.coordinator.subtaskReset(subtask, checkpointId);
    }

    @Override
    public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
        this.mainThreadExecutor.execute(() -> this.checkpointCoordinatorInternal(checkpointId, result));
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) {
        this.mainThreadExecutor.execute(() -> this.coordinator.notifyCheckpointComplete(checkpointId));
    }

    @Override
    public void notifyCheckpointAborted(long checkpointId) {
        this.mainThreadExecutor.execute(() -> this.coordinator.notifyCheckpointAborted(checkpointId));
    }

    @Override
    public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception {
        this.eventValve.reset();
        if (this.context != null) {
            this.context.resetFailed();
        }
        this.coordinator.resetToCheckpoint(checkpointId, checkpointData);
    }

    private void checkpointCoordinatorInternal(long checkpointId, CompletableFuture<byte[]> result) {
        this.mainThreadExecutor.assertRunningInMainThread();
        result.whenComplete((success, failure) -> {
            if (failure != null) {
                result.completeExceptionally((Throwable)failure);
            } else {
                try {
                    this.eventValve.shutValve(checkpointId);
                    result.complete((byte[])success);
                }
                catch (Exception e) {
                    result.completeExceptionally(e);
                }
            }
        });
        try {
            this.eventValve.markForCheckpoint(checkpointId);
            this.coordinator.checkpointCoordinator(checkpointId, result);
        }
        catch (Throwable t) {
            ExceptionUtils.rethrowIfFatalErrorOrOOM((Throwable)t);
            result.completeExceptionally(t);
            this.globalFailureHandler.accept(t);
        }
    }

    @Override
    public void afterSourceBarrierInjection(long checkpointId) {
        this.eventValve.openValveAndUnmarkCheckpoint();
    }

    @Override
    public void abortCurrentTriggering() {
        this.eventValve.openValveAndUnmarkCheckpoint();
    }

    public static OperatorCoordinatorHolder create(SerializedValue<OperatorCoordinator.Provider> serializedProvider, ExecutionJobVertex jobVertex, ClassLoader classLoader) throws Exception {
        try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of((ClassLoader)classLoader);){
            OperatorCoordinator.Provider provider = (OperatorCoordinator.Provider)serializedProvider.deserializeValue(classLoader);
            OperatorID opId = provider.getOperatorId();
            BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> eventSender = (serializedEvent, subtask) -> {
                Execution executionAttempt = jobVertex.getTaskVertices()[subtask].getCurrentExecutionAttempt();
                return executionAttempt.sendOperatorEvent(opId, (SerializedValue<OperatorEvent>)serializedEvent);
            };
            OperatorCoordinatorHolder operatorCoordinatorHolder = OperatorCoordinatorHolder.create(opId, provider, eventSender, jobVertex.getName(), jobVertex.getGraph().getUserClassLoader(), jobVertex.getParallelism(), jobVertex.getMaxParallelism());
            return operatorCoordinatorHolder;
        }
    }

    @VisibleForTesting
    static OperatorCoordinatorHolder create(OperatorID opId, OperatorCoordinator.Provider coordinatorProvider, BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> eventSender, String operatorName, ClassLoader userCodeClassLoader, int operatorParallelism, int operatorMaxParallelism) throws Exception {
        OperatorEventValve valve = new OperatorEventValve(eventSender);
        LazyInitializedCoordinatorContext context = new LazyInitializedCoordinatorContext(opId, valve, operatorName, userCodeClassLoader, operatorParallelism);
        OperatorCoordinator coordinator = coordinatorProvider.create(context);
        return new OperatorCoordinatorHolder(opId, coordinator, context, valve, operatorParallelism, operatorMaxParallelism);
    }

    private static final class LazyInitializedCoordinatorContext
    implements OperatorCoordinator.Context {
        private static final Logger LOG = LoggerFactory.getLogger(LazyInitializedCoordinatorContext.class);
        private final OperatorID operatorId;
        private final OperatorEventValve eventValve;
        private final String operatorName;
        private final ClassLoader userCodeClassLoader;
        private final int operatorParallelism;
        private Consumer<Throwable> globalFailureHandler;
        private Executor schedulerExecutor;
        private volatile boolean failed;

        public LazyInitializedCoordinatorContext(OperatorID operatorId, OperatorEventValve eventValve, String operatorName, ClassLoader userCodeClassLoader, int operatorParallelism) {
            this.operatorId = (OperatorID)((Object)Preconditions.checkNotNull((Object)((Object)operatorId)));
            this.eventValve = (OperatorEventValve)Preconditions.checkNotNull((Object)eventValve);
            this.operatorName = (String)Preconditions.checkNotNull((Object)operatorName);
            this.userCodeClassLoader = (ClassLoader)Preconditions.checkNotNull((Object)userCodeClassLoader);
            this.operatorParallelism = operatorParallelism;
        }

        void lazyInitialize(Consumer<Throwable> globalFailureHandler, Executor schedulerExecutor) {
            this.globalFailureHandler = (Consumer)Preconditions.checkNotNull(globalFailureHandler);
            this.schedulerExecutor = (Executor)Preconditions.checkNotNull((Object)schedulerExecutor);
        }

        void unInitialize() {
            this.globalFailureHandler = null;
            this.schedulerExecutor = null;
        }

        boolean isInitialized() {
            return this.schedulerExecutor != null;
        }

        private void checkInitialized() {
            Preconditions.checkState((boolean)this.isInitialized(), (Object)"Context was not yet initialized");
        }

        void resetFailed() {
            this.failed = false;
        }

        @Override
        public OperatorID getOperatorId() {
            return this.operatorId;
        }

        @Override
        public CompletableFuture<Acknowledge> sendEvent(OperatorEvent evt, int targetSubtask) {
            SerializedValue serializedEvent;
            this.checkInitialized();
            if (targetSubtask < 0 || targetSubtask >= this.currentParallelism()) {
                throw new IllegalArgumentException(String.format("subtask index %d out of bounds [0, %d).", targetSubtask, this.currentParallelism()));
            }
            try {
                serializedEvent = new SerializedValue((Object)evt);
            }
            catch (IOException e) {
                throw new FlinkRuntimeException("Cannot serialize operator event", (Throwable)e);
            }
            return this.eventValve.sendEvent((SerializedValue<OperatorEvent>)serializedEvent, targetSubtask);
        }

        @Override
        public void failJob(Throwable cause) {
            this.checkInitialized();
            if (this.failed) {
                LOG.warn("Ignoring the request to fail job because the job is already failing. The ignored failure cause is", cause);
                return;
            }
            this.failed = true;
            FlinkException e = new FlinkException("Global failure triggered by OperatorCoordinator for '" + this.operatorName + "' (operator " + (Object)((Object)this.operatorId) + ").", cause);
            this.schedulerExecutor.execute(() -> this.globalFailureHandler.accept(e));
        }

        @Override
        public int currentParallelism() {
            return this.operatorParallelism;
        }

        @Override
        public ClassLoader getUserCodeClassloader() {
            return this.userCodeClassLoader;
        }
    }
}

