package org.apache.flink.runtime.source.coordinator;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiConsumer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.operators.coordination.ComponentClosingUtils;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.ThrowableCatchingRunnable;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.class */
public class SourceCoordinatorContext<SplitT extends SourceSplit> implements SplitEnumeratorContext<SplitT>, AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(SourceCoordinatorContext.class);
    private final ScheduledExecutorService workerExecutor;
    private final ScheduledExecutorService coordinatorExecutor;
    private final ExecutorNotifier notifier;
    private final OperatorCoordinator.Context operatorCoordinatorContext;
    private final SimpleVersionedSerializer<SplitT> splitSerializer;
    private final ConcurrentMap<Integer, ReaderInfo> registeredReaders;
    private final SplitAssignmentTracker<SplitT> assignmentTracker;
    private final SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory;
    private final OperatorCoordinator.SubtaskGateway[] subtaskGateways;
    private final String coordinatorThreadName;
    private volatile boolean closed;

    public SourceCoordinatorContext(SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorExecutorThreadFactory, int i, OperatorCoordinator.Context context, SimpleVersionedSerializer<SplitT> simpleVersionedSerializer) {
        this(Executors.newScheduledThreadPool(1, coordinatorExecutorThreadFactory), Executors.newScheduledThreadPool(i, new ExecutorThreadFactory(coordinatorExecutorThreadFactory.getCoordinatorThreadName() + "-worker")), coordinatorExecutorThreadFactory, context, simpleVersionedSerializer, new SplitAssignmentTracker());
    }

    @VisibleForTesting
    SourceCoordinatorContext(ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService2, SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorExecutorThreadFactory, OperatorCoordinator.Context context, SimpleVersionedSerializer<SplitT> simpleVersionedSerializer, SplitAssignmentTracker<SplitT> splitAssignmentTracker) {
        this.workerExecutor = scheduledExecutorService2;
        this.coordinatorExecutor = scheduledExecutorService;
        this.coordinatorThreadFactory = coordinatorExecutorThreadFactory;
        this.operatorCoordinatorContext = context;
        this.splitSerializer = simpleVersionedSerializer;
        this.registeredReaders = new ConcurrentHashMap();
        this.assignmentTracker = splitAssignmentTracker;
        this.coordinatorThreadName = coordinatorExecutorThreadFactory.getCoordinatorThreadName();
        this.subtaskGateways = new OperatorCoordinator.SubtaskGateway[context.currentParallelism()];
        this.notifier = new ExecutorNotifier(scheduledExecutorService2, runnable -> {
            scheduledExecutorService.execute(new ThrowableCatchingRunnable(this::handleUncaughtExceptionFromAsyncCall, runnable));
        });
    }

    public SplitEnumeratorMetricGroup metricGroup() {
        return null;
    }

    public void sendEventToSourceReader(int i, SourceEvent sourceEvent) {
        checkSubtaskIndex(i);
        callInCoordinatorThread(() -> {
            getGatewayAndCheckReady(i).sendEvent(new SourceEventWrapper(sourceEvent));
            return null;
        }, String.format("Failed to send event %s to subtask %d", sourceEvent, Integer.valueOf(i)));
    }

    public int currentParallelism() {
        return this.operatorCoordinatorContext.currentParallelism();
    }

    public Map<Integer, ReaderInfo> registeredReaders() {
        return Collections.unmodifiableMap(this.registeredReaders);
    }

    public void assignSplits(SplitsAssignment<SplitT> splitsAssignment) {
        callInCoordinatorThread(() -> {
            for (Integer num : splitsAssignment.assignment().keySet()) {
                if (!this.registeredReaders.containsKey(num)) {
                    throw new IllegalArgumentException(String.format("Cannot assign splits %s to subtask %d because the subtask is not registered.", this.registeredReaders.get(num), num));
                }
            }
            this.assignmentTracker.recordSplitAssignment(splitsAssignment);
            splitsAssignment.assignment().forEach((num2, list) -> {
                try {
                    getGatewayAndCheckReady(num2.intValue()).sendEvent(new AddSplitEvent(list, this.splitSerializer));
                } catch (IOException e) {
                    throw new FlinkRuntimeException("Failed to serialize splits.", e);
                }
            });
            return null;
        }, String.format("Failed to assign splits %s due to ", splitsAssignment));
    }

    public void signalNoMoreSplits(int i) {
        checkSubtaskIndex(i);
        callInCoordinatorThread(() -> {
            getGatewayAndCheckReady(i).sendEvent(new NoMoreSplitsEvent());
            return null;
        }, "Failed to send 'NoMoreSplits' to reader " + i);
    }

    public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> biConsumer, long j, long j2) {
        this.notifier.notifyReadyAsync(callable, biConsumer, j, j2);
    }

    public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> biConsumer) {
        this.notifier.notifyReadyAsync(callable, biConsumer);
    }

    public void runInCoordinatorThread(Runnable runnable) {
        this.coordinatorExecutor.execute(runnable);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws InterruptedException {
        this.closed = true;
        ComponentClosingUtils.shutdownExecutorForcefully(this.workerExecutor, Duration.ofNanos(CheckpointOptions.NO_ALIGNED_CHECKPOINT_TIME_OUT));
        ComponentClosingUtils.shutdownExecutorForcefully(this.coordinatorExecutor, Duration.ofNanos(CheckpointOptions.NO_ALIGNED_CHECKPOINT_TIME_OUT));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void subtaskReady(OperatorCoordinator.SubtaskGateway subtaskGateway) {
        int subtask = subtaskGateway.getSubtask();
        if (this.subtaskGateways[subtask] != null) {
            throw new IllegalStateException("Already have a subtask gateway for " + subtask);
        }
        this.subtaskGateways[subtaskGateway.getSubtask()] = subtaskGateway;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void subtaskNotReady(int i) {
        this.subtaskGateways[i] = null;
    }

    OperatorCoordinator.SubtaskGateway getGatewayAndCheckReady(int i) {
        OperatorCoordinator.SubtaskGateway subtaskGateway = this.subtaskGateways[i];
        if (subtaskGateway != null) {
            return subtaskGateway;
        }
        throw new IllegalStateException(String.format("Subtask %d is not ready yet to receive events.", Integer.valueOf(i)));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void failJob(Throwable th) {
        this.operatorCoordinatorContext.failJob(th);
    }

    void handleUncaughtExceptionFromAsyncCall(Throwable th) {
        if (this.closed) {
            return;
        }
        ExceptionUtils.rethrowIfFatalErrorOrOOM(th);
        LOG.error("Exception while handling result from async call in {}. Triggering job failover.", this.coordinatorThreadName, th);
        failJob(th);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onCheckpoint(long j) throws Exception {
        this.assignmentTracker.onCheckpoint(j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerSourceReader(ReaderInfo readerInfo) {
        ReaderInfo put = this.registeredReaders.put(Integer.valueOf(readerInfo.getSubtaskId()), readerInfo);
        if (put != null) {
            throw new IllegalStateException("Overwriting " + put + " with " + readerInfo);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void unregisterSourceReader(int i) {
        this.registeredReaders.remove(Integer.valueOf(i));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<SplitT> getAndRemoveUncheckpointedAssignment(int i, long j) {
        return this.assignmentTracker.getAndRemoveUncheckpointedAssignment(i, j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onCheckpointComplete(long j) {
        this.assignmentTracker.onCheckpointComplete(j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public OperatorCoordinator.Context getCoordinatorContext() {
        return this.operatorCoordinatorContext;
    }

    private void checkSubtaskIndex(int i) {
        if (i < 0 || i >= getCoordinatorContext().currentParallelism()) {
            throw new IllegalArgumentException(String.format("Subtask index %d is out of bounds [0, %s)", Integer.valueOf(i), Integer.valueOf(getCoordinatorContext().currentParallelism())));
        }
    }

    private <V> V callInCoordinatorThread(Callable<V> callable, String str) {
        if (this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
            try {
                return callable.call();
            } catch (Throwable th) {
                LOG.error("Uncaught Exception in Source Coordinator Executor", th);
                throw new FlinkRuntimeException(str, th);
            }
        }
        try {
            return (V) this.coordinatorExecutor.submit(() -> {
                try {
                    return callable.call();
                } catch (Throwable th2) {
                    LOG.error("Uncaught Exception in Source Coordinator Executor", th2);
                    ExceptionUtils.rethrowException(th2);
                    return null;
                }
            }).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new FlinkRuntimeException(str, e);
        }
    }
}
