/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.operation;

import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.core.Member;
import com.hazelcast.core.MemberLeftException;
import com.hazelcast.jet.DAG;
import com.hazelcast.jet.TopologyChangedException;
import com.hazelcast.jet.impl.JetService;
import com.hazelcast.jet.impl.execution.init.ExecutionPlan;
import com.hazelcast.jet.impl.operation.AsyncExecutionOperation;
import com.hazelcast.jet.impl.operation.CompleteOperation;
import com.hazelcast.jet.impl.operation.ExecuteOperation;
import com.hazelcast.jet.impl.operation.InitOperation;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.spi.Operation;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class ExecuteJobOperation
extends AsyncExecutionOperation {
    private DAG dag;
    private volatile CompletableFuture<Object> executionInvocationFuture;

    public ExecuteJobOperation(long executionId, DAG dag) {
        super(executionId);
        this.dag = dag;
    }

    private ExecuteJobOperation() {
    }

    @Override
    protected void doRun() throws Exception {
        long start = System.currentTimeMillis();
        this.getLogger().info("Start executing job " + this.executionId + ": " + this.dag);
        JetService service = (JetService)this.getService();
        this.getLogger().fine("Building execution plan for job " + this.executionId + '.');
        Map<Member, ExecutionPlan> executionPlanMap = service.createExecutionPlans(this.dag);
        this.getLogger().fine("Built execution plan for job " + this.executionId + '.');
        CompletableFuture<Object> init = this.invokeOnCluster(executionPlanMap, plan -> new InitOperation(this.executionId, (ExecutionPlan)plan), 250);
        CompletableFuture<Throwable> initFailed = ExecuteJobOperation.onException(init);
        CompletableFuture executionDone = new CompletableFuture();
        CompletionStage execution = ((CompletableFuture)init.thenCompose(x -> {
            this.executionInvocationFuture = this.invokeOnCluster(executionPlanMap, plan -> new ExecuteOperation(this.executionId), executionDone, true, 250);
            return this.executionInvocationFuture;
        })).handle((v, e) -> e != null ? ExceptionUtil.peel(e) : null);
        CompletionStage completion = ((CompletableFuture)((CompletableFuture)CompletableFuture.anyOf(initFailed, executionDone).thenCombine(execution, (r, e) -> e)).thenCompose(e -> this.invokeOnCluster(executionPlanMap, plan -> new CompleteOperation(this.executionId, this.topologyChangeOrIdentity((Throwable)e)), 3))).handle((v, e) -> e != null ? ExceptionUtil.peel(e) : null);
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)execution).thenCombine(completion, (e1, e2) -> e1 == null ? e2 : e1)).exceptionally(Function.identity())).thenAccept(e -> {
            long elapsed = System.currentTimeMillis() - start;
            this.getLogger().info("Execution of job " + this.executionId + " completed in " + elapsed + "ms.");
            this.doSendResponse(this.topologyChangeOrIdentity((Throwable)e));
        });
    }

    private Throwable topologyChangeOrIdentity(Throwable e) {
        if (e instanceof MemberLeftException) {
            return new TopologyChangedException("Topology has been changed", e);
        }
        return e;
    }

    private <E> CompletableFuture<Object> invokeOnCluster(Map<Member, E> memberMap, Function<E, Operation> func, int tryCount) {
        return this.invokeOnCluster(memberMap, func, new CompletableFuture<Void>(), false, tryCount);
    }

    private <E> CompletableFuture<Object> invokeOnCluster(Map<Member, E> memberMap, Function<E, Operation> func, CompletableFuture<Void> doneFuture, boolean propagateError, int tryCount) {
        AtomicInteger doneLatch = new AtomicInteger(memberMap.size());
        Stream<ICompletableFuture> futures = memberMap.entrySet().stream().map(e -> this.getNodeEngine().getOperationService().createInvocationBuilder("hz:impl:jetService", (Operation)func.apply(e.getValue()), ((Member)e.getKey()).getAddress()).setDoneCallback(() -> {
            if (doneLatch.decrementAndGet() == 0) {
                doneFuture.complete(null);
            }
        }).setTryCount(tryCount).invoke());
        return ExecuteJobOperation.allOf(futures.collect(Collectors.toList()), propagateError);
    }

    @Override
    public void cancel() {
        if (this.executionInvocationFuture != null) {
            this.getLogger().info("Cancelling job " + this.executionId);
            this.executionInvocationFuture.cancel(true);
        }
    }

    @Override
    protected void writeInternal(ObjectDataOutput out) throws IOException {
        super.writeInternal(out);
        out.writeObject((Object)this.dag);
    }

    @Override
    protected void readInternal(ObjectDataInput in) throws IOException {
        super.readInternal(in);
        this.dag = (DAG)in.readObject();
    }

    private static CompletableFuture<Object> allOf(Collection<ICompletableFuture> futures, boolean propagateError) {
        CompletableFuture<Object> compositeFuture = new CompletableFuture<Object>();
        compositeFuture.whenComplete((r, e) -> {
            if (e instanceof CancellationException) {
                futures.forEach(f -> f.cancel(true));
            }
        });
        AtomicInteger completionLatch = new AtomicInteger(futures.size());
        AtomicReference firstError = new AtomicReference();
        futures.forEach(f -> f.andThen((ExecutionCallback)new Callback(r -> {
            if (completionLatch.decrementAndGet() == 0) {
                Throwable error = (Throwable)firstError.get();
                if (error == null) {
                    compositeFuture.complete(null);
                } else {
                    compositeFuture.completeExceptionally(error);
                }
            }
        }, e -> {
            firstError.compareAndSet(null, e);
            if (propagateError) {
                futures.forEach(sub -> sub.cancel(true));
            }
            if (completionLatch.decrementAndGet() == 0) {
                compositeFuture.completeExceptionally((Throwable)firstError.get());
            }
        })));
        return compositeFuture;
    }

    private static <T> CompletableFuture<Throwable> onException(CompletableFuture<T> stage) {
        CompletableFuture<Throwable> f = new CompletableFuture<Throwable>();
        stage.whenComplete((r, e) -> {
            if (e != null) {
                f.complete((Throwable)e);
            }
        });
        return f;
    }

    private static final class Callback
    implements ExecutionCallback {
        private final Consumer<Object> onResponse;
        private final Consumer<Throwable> onError;

        private Callback(Consumer<Object> onResponse, Consumer<Throwable> onError) {
            this.onResponse = onResponse;
            this.onError = onError;
        }

        public void onResponse(Object response) {
            this.onResponse.accept(response);
        }

        public void onFailure(Throwable t) {
            this.onError.accept(t);
        }
    }
}

