package org.apache.flink.core.fs;

import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.annotation.Internal;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/core/fs/BackPressuringExecutor.class */
public final class BackPressuringExecutor implements Executor {
    private final Executor delegate;
    private final Semaphore permits;

    /* loaded from: input_file:org/apache/flink/core/fs/BackPressuringExecutor$SemaphoreReleasingRunnable.class */
    private static class SemaphoreReleasingRunnable implements Runnable {
        private final Runnable delegate;
        private final Semaphore toRelease;
        private final AtomicBoolean released = new AtomicBoolean();

        SemaphoreReleasingRunnable(Runnable runnable, Semaphore semaphore) {
            this.delegate = runnable;
            this.toRelease = semaphore;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.delegate.run();
            } finally {
                release();
            }
        }

        void release() {
            if (this.released.compareAndSet(false, true)) {
                this.toRelease.release();
            }
        }
    }

    public BackPressuringExecutor(Executor executor, int i) {
        Preconditions.checkArgument(i > 0, "numConcurrentExecutions must be > 0");
        this.delegate = (Executor) Preconditions.checkNotNull(executor, "delegate");
        this.permits = new Semaphore(i, true);
    }

    @Override // java.util.concurrent.Executor
    public void execute(Runnable runnable) {
        try {
            this.permits.acquire();
            SemaphoreReleasingRunnable semaphoreReleasingRunnable = new SemaphoreReleasingRunnable(runnable, this.permits);
            try {
                this.delegate.execute(semaphoreReleasingRunnable);
            } catch (Throwable th) {
                semaphoreReleasingRunnable.release();
                ExceptionUtils.rethrow(th, th.getMessage());
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new FlinkRuntimeException("interrupted:", e);
        }
    }
}
