/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.concurrent;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.util.Preconditions;
import scala.concurrent.duration.Deadline;

public class FutureUtils {
    public static <T> Future<T> retry(final Callable<Future<T>> operation, final int retries, final Executor executor) {
        Future<T> operationResultFuture;
        try {
            operationResultFuture = operation.call();
        }
        catch (Exception e) {
            return FlinkCompletableFuture.completedExceptionally(new RetryException("Could not execute the provided operation.", e));
        }
        return operationResultFuture.handleAsync(new BiFunction<T, Throwable, Future<T>>(){

            @Override
            public Future<T> apply(T t, Throwable throwable) {
                if (throwable != null) {
                    if (retries > 0) {
                        return FutureUtils.retry(operation, retries - 1, executor);
                    }
                    return FlinkCompletableFuture.completedExceptionally(new RetryException("Could not complete the operation. Number of retries has been exhausted.", throwable));
                }
                return FlinkCompletableFuture.completed(t);
            }
        }, executor).thenCompose(new ApplyFunction<Future<T>, Future<T>>(){

            @Override
            public Future<T> apply(Future<T> value) {
                return value;
            }
        });
    }

    public static <T> Future<T> retrySuccessful(final Callable<Future<T>> operation, final FilterFunction<T> successPredicate, final Deadline deadline, final Executor executor) {
        Future<T> operationResultFuture;
        try {
            operationResultFuture = operation.call();
        }
        catch (Exception e) {
            return FlinkCompletableFuture.completedExceptionally(new RetryException("Could not execute the provided operation.", e));
        }
        return operationResultFuture.handleAsync(new BiFunction<T, Throwable, Future<T>>(){

            @Override
            public Future<T> apply(T t, Throwable throwable) {
                Boolean predicateResult;
                if (throwable != null) {
                    if (deadline.hasTimeLeft()) {
                        return FutureUtils.retrySuccessful(operation, successPredicate, deadline, executor);
                    }
                    return FlinkCompletableFuture.completedExceptionally(new RetryException("Could not complete the operation. Number of retries has been exhausted.", throwable));
                }
                try {
                    predicateResult = successPredicate.filter(t);
                }
                catch (Exception e) {
                    return FlinkCompletableFuture.completedExceptionally(new RetryException("Predicate threw an exception.", e));
                }
                if (predicateResult.booleanValue()) {
                    return FlinkCompletableFuture.completed(t);
                }
                if (deadline.hasTimeLeft()) {
                    return FutureUtils.retrySuccessful(operation, successPredicate, deadline, executor);
                }
                return FlinkCompletableFuture.completedExceptionally(new RetryException("No time left and predicate returned false for " + t));
            }
        }, executor).thenCompose(new ApplyFunction<Future<T>, Future<T>>(){

            @Override
            public Future<T> apply(Future<T> value) {
                return value;
            }
        });
    }

    public static <T> ConjunctFuture<Collection<T>> combineAll(Collection<? extends Future<? extends T>> futures) {
        Preconditions.checkNotNull(futures, (String)"futures");
        ResultConjunctFuture conjunct = new ResultConjunctFuture(futures.size());
        if (futures.isEmpty()) {
            conjunct.complete(Collections.emptyList());
        } else {
            for (Future future : futures) {
                future.handle(conjunct.completionHandler);
            }
        }
        return conjunct;
    }

    public static ConjunctFuture<Void> waitForAll(Collection<? extends Future<?>> futures) {
        Preconditions.checkNotNull(futures, (String)"futures");
        return new WaitingConjunctFuture(futures);
    }

    private static final class WaitingConjunctFuture
    extends FlinkCompletableFuture<Void>
    implements ConjunctFuture<Void> {
        private final AtomicInteger numCompleted = new AtomicInteger(0);
        private final int numTotal;
        private final BiFunction<Object, Throwable, Void> completionHandler = new BiFunction<Object, Throwable, Void>(){

            @Override
            public Void apply(Object o, Throwable throwable) {
                if (throwable == null) {
                    if (WaitingConjunctFuture.this.numTotal == WaitingConjunctFuture.this.numCompleted.incrementAndGet()) {
                        WaitingConjunctFuture.this.complete(null);
                    }
                } else {
                    WaitingConjunctFuture.this.completeExceptionally(throwable);
                }
                return null;
            }
        };

        private WaitingConjunctFuture(Collection<? extends Future<?>> futures) {
            Preconditions.checkNotNull(futures, (String)"Futures must not be null.");
            this.numTotal = futures.size();
            if (futures.isEmpty()) {
                this.complete(null);
            } else {
                for (Future<?> future : futures) {
                    future.handle(this.completionHandler);
                }
            }
        }

        @Override
        public int getNumFuturesTotal() {
            return this.numTotal;
        }

        @Override
        public int getNumFuturesCompleted() {
            return this.numCompleted.get();
        }
    }

    private static class ResultConjunctFuture<T>
    extends FlinkCompletableFuture<Collection<T>>
    implements ConjunctFuture<Collection<T>> {
        private final int numTotal;
        private final AtomicInteger nextIndex = new AtomicInteger(0);
        private final AtomicInteger numCompleted = new AtomicInteger(0);
        private volatile T[] results;
        final BiFunction<T, Throwable, Void> completionHandler = new BiFunction<T, Throwable, Void>(){

            @Override
            public Void apply(T o, Throwable throwable) {
                if (throwable != null) {
                    ResultConjunctFuture.this.completeExceptionally(throwable);
                } else {
                    int index = ResultConjunctFuture.this.nextIndex.getAndIncrement();
                    ((ResultConjunctFuture)ResultConjunctFuture.this).results[index] = o;
                    if (ResultConjunctFuture.this.numCompleted.incrementAndGet() == ResultConjunctFuture.this.numTotal) {
                        ResultConjunctFuture.this.complete(Arrays.asList(ResultConjunctFuture.this.results));
                    }
                }
                return null;
            }
        };

        ResultConjunctFuture(int numTotal) {
            this.numTotal = numTotal;
            this.results = new Object[numTotal];
        }

        @Override
        public int getNumFuturesTotal() {
            return this.numTotal;
        }

        @Override
        public int getNumFuturesCompleted() {
            return this.numCompleted.get();
        }
    }

    public static interface ConjunctFuture<T>
    extends CompletableFuture<T> {
        public int getNumFuturesTotal();

        public int getNumFuturesCompleted();
    }

    public static class RetryException
    extends Exception {
        private static final long serialVersionUID = 3613470781274141862L;

        public RetryException(String message) {
            super(message);
        }

        public RetryException(String message, Throwable cause) {
            super(message, cause);
        }

        public RetryException(Throwable cause) {
            super(cause);
        }
    }
}

