package org.apache.flink.runtime.concurrent;

import akka.dispatch.OnComplete;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.SupplierWithException;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/concurrent/FutureUtils.class */
public class FutureUtils {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/concurrent/FutureUtils$CompletionConjunctFuture.class */
    public static final class CompletionConjunctFuture extends ConjunctFuture<Void> {
        private final Object lock;
        private final int numFuturesTotal;
        private final Collection<? extends CompletableFuture<?>> futuresToComplete;
        private int futuresCompleted;
        private Throwable globalThrowable;

        private CompletionConjunctFuture(Collection<? extends CompletableFuture<?>> collection) {
            this.lock = new Object();
            this.futuresToComplete = (Collection) Preconditions.checkNotNull(collection);
            this.numFuturesTotal = collection.size();
            this.futuresCompleted = 0;
            this.globalThrowable = null;
            if (collection.isEmpty()) {
                complete(null);
                return;
            }
            Iterator<? extends CompletableFuture<?>> it = collection.iterator();
            while (it.hasNext()) {
                it.next().whenComplete(this::completeFuture);
            }
        }

        private void completeFuture(Object obj, Throwable th) {
            synchronized (this.lock) {
                this.futuresCompleted++;
                if (th != null) {
                    this.globalThrowable = ExceptionUtils.firstOrSuppressed(th, this.globalThrowable);
                }
                if (this.futuresCompleted == this.numFuturesTotal) {
                    if (this.globalThrowable != null) {
                        completeExceptionally(this.globalThrowable);
                    } else {
                        complete(null);
                    }
                }
            }
        }

        @Override // org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture
        public int getNumFuturesTotal() {
            return this.numFuturesTotal;
        }

        @Override // org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture
        public int getNumFuturesCompleted() {
            int i;
            synchronized (this.lock) {
                i = this.futuresCompleted;
            }
            return i;
        }

        @Override // org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture
        protected Collection<? extends CompletableFuture<?>> getConjunctFutures() {
            return this.futuresToComplete;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/concurrent/FutureUtils$ConjunctFuture.class */
    public static abstract class ConjunctFuture<T> extends CompletableFuture<T> {
        public abstract int getNumFuturesTotal();

        public abstract int getNumFuturesCompleted();

        protected abstract Collection<? extends CompletableFuture<?>> getConjunctFutures();

        @Override // java.util.concurrent.CompletableFuture, java.util.concurrent.Future
        public boolean cancel(boolean z) {
            Iterator<? extends CompletableFuture<?>> it = getConjunctFutures().iterator();
            while (it.hasNext()) {
                it.next().cancel(z);
            }
            return super.cancel(z);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/concurrent/FutureUtils$Delayer.class */
    private static final class Delayer {
        static final ScheduledThreadPoolExecutor delayer = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory("FlinkCompletableFutureDelayScheduler"));

        private Delayer() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static ScheduledFuture<?> delay(Runnable runnable, long j, TimeUnit timeUnit) {
            Preconditions.checkNotNull(runnable);
            Preconditions.checkNotNull(timeUnit);
            return delayer.schedule(runnable, j, timeUnit);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/concurrent/FutureUtils$ResultConjunctFuture.class */
    public static class ResultConjunctFuture<T> extends ConjunctFuture<Collection<T>> {
        private final Collection<? extends CompletableFuture<? extends T>> resultFutures;
        private final int numTotal;
        private final AtomicInteger nextIndex = new AtomicInteger(0);
        private final AtomicInteger numCompleted = new AtomicInteger(0);
        private volatile T[] results;

        private void handleCompletedFuture(T t, Throwable th) {
            if (th != null) {
                completeExceptionally(th);
                return;
            }
            this.results[this.nextIndex.getAndIncrement()] = t;
            if (this.numCompleted.incrementAndGet() == this.numTotal) {
                complete(Arrays.asList(this.results));
            }
        }

        ResultConjunctFuture(Collection<? extends CompletableFuture<? extends T>> collection) {
            this.resultFutures = (Collection) Preconditions.checkNotNull(collection);
            this.numTotal = collection.size();
            this.results = (T[]) new Object[this.numTotal];
            if (collection.isEmpty()) {
                complete(Collections.emptyList());
                return;
            }
            Iterator<? extends CompletableFuture<? extends T>> it = collection.iterator();
            while (it.hasNext()) {
                it.next().whenComplete(this::handleCompletedFuture);
            }
        }

        @Override // org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture
        public int getNumFuturesTotal() {
            return this.numTotal;
        }

        @Override // org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture
        public int getNumFuturesCompleted() {
            return this.numCompleted.get();
        }

        @Override // org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture
        protected Collection<? extends CompletableFuture<?>> getConjunctFutures() {
            return this.resultFutures;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/concurrent/FutureUtils$RetryException.class */
    public static class RetryException extends Exception {
        private static final long serialVersionUID = 3613470781274141862L;

        public RetryException(String str) {
            super(str);
        }

        public RetryException(String str, Throwable th) {
            super(str, th);
        }

        public RetryException(Throwable th) {
            super(th);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/concurrent/FutureUtils$Timeout.class */
    private static final class Timeout implements Runnable {
        private final CompletableFuture<?> future;

        private Timeout(CompletableFuture<?> completableFuture) {
            this.future = (CompletableFuture) Preconditions.checkNotNull(completableFuture);
        }

        @Override // java.lang.Runnable
        public void run() {
            this.future.completeExceptionally(new TimeoutException());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/concurrent/FutureUtils$WaitingConjunctFuture.class */
    public static final class WaitingConjunctFuture extends ConjunctFuture<Void> {
        private final Collection<? extends CompletableFuture<?>> futures;
        private final AtomicInteger numCompleted;
        private final int numTotal;

        private void handleCompletedFuture(Object obj, Throwable th) {
            if (th != null) {
                completeExceptionally(th);
            } else if (this.numTotal == this.numCompleted.incrementAndGet()) {
                complete(null);
            }
        }

        private WaitingConjunctFuture(Collection<? extends CompletableFuture<?>> collection) {
            this.numCompleted = new AtomicInteger(0);
            this.futures = (Collection) Preconditions.checkNotNull(collection);
            this.numTotal = collection.size();
            if (collection.isEmpty()) {
                complete(null);
                return;
            }
            Iterator<? extends CompletableFuture<?>> it = collection.iterator();
            while (it.hasNext()) {
                it.next().whenComplete(this::handleCompletedFuture);
            }
        }

        @Override // org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture
        public int getNumFuturesTotal() {
            return this.numTotal;
        }

        @Override // org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture
        public int getNumFuturesCompleted() {
            return this.numCompleted.get();
        }

        @Override // org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture
        protected Collection<? extends CompletableFuture<?>> getConjunctFutures() {
            return this.futures;
        }
    }

    public static <T> CompletableFuture<T> retry(Supplier<CompletableFuture<T>> supplier, int i, Executor executor) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        retryOperation(completableFuture, supplier, i, executor);
        return completableFuture;
    }

    private static <T> void retryOperation(CompletableFuture<T> completableFuture, Supplier<CompletableFuture<T>> supplier, int i, Executor executor) {
        if (completableFuture.isDone()) {
            return;
        }
        CompletableFuture<T> completableFuture2 = supplier.get();
        completableFuture2.whenCompleteAsync((BiConsumer) (obj, th) -> {
            if (th == null) {
                completableFuture.complete(obj);
                return;
            }
            if (th instanceof CancellationException) {
                completableFuture.completeExceptionally(new RetryException("Operation future was cancelled.", th));
            } else if (i > 0) {
                retryOperation(completableFuture, supplier, i - 1, executor);
            } else {
                completableFuture.completeExceptionally(new RetryException("Could not complete the operation. Number of retries has been exhausted.", th));
            }
        }, executor);
        completableFuture.whenComplete((BiConsumer) (obj2, th2) -> {
            completableFuture2.cancel(false);
        });
    }

    public static <T> CompletableFuture<T> retryWithDelay(Supplier<CompletableFuture<T>> supplier, int i, Time time, Predicate<Throwable> predicate, ScheduledExecutor scheduledExecutor) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        retryOperationWithDelay(completableFuture, supplier, i, time, predicate, scheduledExecutor);
        return completableFuture;
    }

    public static <T> CompletableFuture<T> retryWithDelay(Supplier<CompletableFuture<T>> supplier, int i, Time time, ScheduledExecutor scheduledExecutor) {
        return retryWithDelay(supplier, i, time, th -> {
            return true;
        }, scheduledExecutor);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> void retryOperationWithDelay(CompletableFuture<T> completableFuture, Supplier<CompletableFuture<T>> supplier, int i, Time time, Predicate<Throwable> predicate, ScheduledExecutor scheduledExecutor) {
        if (completableFuture.isDone()) {
            return;
        }
        CompletableFuture<T> completableFuture2 = supplier.get();
        completableFuture2.whenComplete((BiConsumer) (obj, th) -> {
            if (th == null) {
                completableFuture.complete(obj);
                return;
            }
            if (th instanceof CancellationException) {
                completableFuture.completeExceptionally(new RetryException("Operation future was cancelled.", th));
                return;
            }
            if (i <= 0 || !predicate.test(th)) {
                completableFuture.completeExceptionally(new RetryException("Could not complete the operation. " + (i == 0 ? "Number of retries has been exhausted." : "Exception is not retryable."), th));
            } else {
                ScheduledFuture<?> schedule = scheduledExecutor.schedule(() -> {
                    retryOperationWithDelay(completableFuture, supplier, i - 1, time, predicate, scheduledExecutor);
                }, time.toMilliseconds(), TimeUnit.MILLISECONDS);
                completableFuture.whenComplete((obj, th) -> {
                    schedule.cancel(false);
                });
            }
        });
        completableFuture.whenComplete((BiConsumer) (obj2, th2) -> {
            completableFuture2.cancel(false);
        });
    }

    public static <T> CompletableFuture<T> retrySuccesfulWithDelay(Supplier<CompletableFuture<T>> supplier, Time time, Deadline deadline, Predicate<T> predicate, ScheduledExecutor scheduledExecutor) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        retrySuccessfulOperationWithDelay(completableFuture, supplier, time, deadline, predicate, scheduledExecutor);
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> void retrySuccessfulOperationWithDelay(CompletableFuture<T> completableFuture, Supplier<CompletableFuture<T>> supplier, Time time, Deadline deadline, Predicate<T> predicate, ScheduledExecutor scheduledExecutor) {
        if (completableFuture.isDone()) {
            return;
        }
        CompletableFuture<T> completableFuture2 = supplier.get();
        completableFuture2.whenComplete((BiConsumer) (obj, th) -> {
            if (th != null) {
                if (th instanceof CancellationException) {
                    completableFuture.completeExceptionally(new RetryException("Operation future was cancelled.", th));
                    return;
                } else {
                    completableFuture.completeExceptionally(th);
                    return;
                }
            }
            if (predicate.test(obj)) {
                completableFuture.complete(obj);
            } else if (!deadline.hasTimeLeft()) {
                completableFuture.completeExceptionally(new RetryException("Could not satisfy the predicate within the allowed time."));
            } else {
                ScheduledFuture<?> schedule = scheduledExecutor.schedule(() -> {
                    retrySuccessfulOperationWithDelay(completableFuture, supplier, time, deadline, predicate, scheduledExecutor);
                }, time.toMilliseconds(), TimeUnit.MILLISECONDS);
                completableFuture.whenComplete((obj, th) -> {
                    schedule.cancel(false);
                });
            }
        });
        completableFuture.whenComplete((BiConsumer) (obj2, th2) -> {
            completableFuture2.cancel(false);
        });
    }

    public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> completableFuture, long j, TimeUnit timeUnit) {
        if (!completableFuture.isDone()) {
            ScheduledFuture delay = Delayer.delay(new Timeout(completableFuture), j, timeUnit);
            completableFuture.whenComplete((BiConsumer) (obj, th) -> {
                if (delay.isDone()) {
                    return;
                }
                delay.cancel(false);
            });
        }
        return completableFuture;
    }

    public static CompletableFuture<Void> runAfterwards(CompletableFuture<?> completableFuture, RunnableWithException runnableWithException) {
        return runAfterwardsAsync(completableFuture, runnableWithException, Executors.directExecutor());
    }

    public static CompletableFuture<Void> runAfterwardsAsync(CompletableFuture<?> completableFuture, RunnableWithException runnableWithException) {
        return runAfterwardsAsync(completableFuture, runnableWithException, ForkJoinPool.commonPool());
    }

    public static CompletableFuture<Void> runAfterwardsAsync(CompletableFuture<?> completableFuture, RunnableWithException runnableWithException, Executor executor) {
        CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
        completableFuture.whenCompleteAsync((obj, th) -> {
            try {
                runnableWithException.run();
            } catch (Throwable th) {
                th = ExceptionUtils.firstOrSuppressed(th, th);
            }
            if (th != null) {
                completableFuture2.completeExceptionally(th);
            } else {
                completableFuture2.complete(null);
            }
        }, executor);
        return completableFuture2;
    }

    public static CompletableFuture<Void> composeAfterwards(CompletableFuture<?> completableFuture, Supplier<CompletableFuture<?>> supplier) {
        CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
        completableFuture.whenComplete((obj, th) -> {
            ((CompletableFuture) supplier.get()).whenComplete((obj, th) -> {
                if (th != null) {
                    completableFuture2.completeExceptionally(ExceptionUtils.firstOrSuppressed(th, th));
                } else if (th != null) {
                    completableFuture2.completeExceptionally(th);
                } else {
                    completableFuture2.complete(null);
                }
            });
        });
        return completableFuture2;
    }

    public static <T> ConjunctFuture<Collection<T>> combineAll(Collection<? extends CompletableFuture<? extends T>> collection) {
        Preconditions.checkNotNull(collection, "futures");
        return new ResultConjunctFuture(collection);
    }

    public static ConjunctFuture<Void> waitForAll(Collection<? extends CompletableFuture<?>> collection) {
        Preconditions.checkNotNull(collection, "futures");
        return new WaitingConjunctFuture(collection);
    }

    public static ConjunctFuture<Void> completeAll(Collection<? extends CompletableFuture<?>> collection) {
        return new CompletionConjunctFuture(collection);
    }

    public static <T> CompletableFuture<T> completedExceptionally(Throwable th) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        completableFuture.completeExceptionally(th);
        return completableFuture;
    }

    public static <T> CompletableFuture<T> supplyAsync(SupplierWithException<T, ?> supplierWithException, Executor executor) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                return supplierWithException.get();
            } catch (Throwable th) {
                throw new CompletionException(th);
            }
        }, executor);
    }

    public static FiniteDuration toFiniteDuration(Time time) {
        return new FiniteDuration(time.toMilliseconds(), TimeUnit.MILLISECONDS);
    }

    public static Time toTime(FiniteDuration finiteDuration) {
        return Time.milliseconds(finiteDuration.toMillis());
    }

    public static <T> CompletableFuture<T> toJava(Future<T> future) {
        final CompletableFuture<T> completableFuture = new CompletableFuture<>();
        future.onComplete(new OnComplete<T>() { // from class: org.apache.flink.runtime.concurrent.FutureUtils.1
            @Override // akka.dispatch.OnComplete
            public void onComplete(Throwable th, T t) {
                if (th != null) {
                    completableFuture.completeExceptionally(th);
                } else {
                    completableFuture.complete(t);
                }
            }
        }, Executors.directExecutionContext());
        return completableFuture;
    }
}
