package com.hazelcast.jet.impl.operation;

import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.core.Member;
import com.hazelcast.core.MemberLeftException;
import com.hazelcast.jet.DAG;
import com.hazelcast.jet.TopologyChangedException;
import com.hazelcast.jet.impl.JetService;
import com.hazelcast.jet.impl.execution.init.ExecutionPlan;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.spi.Operation;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

/* loaded from: input_file:com/hazelcast/jet/impl/operation/ExecuteJobOperation.class */
public class ExecuteJobOperation extends AsyncExecutionOperation {
    private DAG dag;
    private volatile CompletableFuture<Object> executionInvocationFuture;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/impl/operation/ExecuteJobOperation$Callback.class */
    public static final class Callback implements ExecutionCallback {
        private final Consumer<Object> onResponse;
        private final Consumer<Throwable> onError;

        private Callback(Consumer<Object> consumer, Consumer<Throwable> consumer2) {
            this.onResponse = consumer;
            this.onError = consumer2;
        }

        @Override // com.hazelcast.core.ExecutionCallback
        public void onResponse(Object obj) {
            this.onResponse.accept(obj);
        }

        @Override // com.hazelcast.core.ExecutionCallback
        public void onFailure(Throwable th) {
            this.onError.accept(th);
        }
    }

    public ExecuteJobOperation(long j, DAG dag) {
        super(j);
        this.dag = dag;
    }

    private ExecuteJobOperation() {
    }

    @Override // com.hazelcast.jet.impl.operation.AsyncExecutionOperation
    protected void doRun() throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        getLogger().info("Start executing job " + this.executionId + ": " + this.dag);
        JetService jetService = (JetService) getService();
        getLogger().fine("Building execution plan for job " + this.executionId + '.');
        Map<Member, ExecutionPlan> createExecutionPlans = jetService.createExecutionPlans(this.dag);
        getLogger().fine("Built execution plan for job " + this.executionId + '.');
        CompletableFuture<Object> invokeOnCluster = invokeOnCluster(createExecutionPlans, executionPlan -> {
            return new InitOperation(this.executionId, executionPlan);
        }, 250);
        CompletableFuture<Throwable> onException = onException(invokeOnCluster);
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture handle = invokeOnCluster.thenCompose(obj -> {
            CompletableFuture<Object> invokeOnCluster2 = invokeOnCluster(createExecutionPlans, executionPlan2 -> {
                return new ExecuteOperation(this.executionId);
            }, completableFuture, true, 250);
            this.executionInvocationFuture = invokeOnCluster2;
            return invokeOnCluster2;
        }).handle((BiFunction<? super U, Throwable, ? extends U>) (obj2, th) -> {
            if (th != null) {
                return ExceptionUtil.peel(th);
            }
            return null;
        });
        handle.thenCombine((CompletionStage) CompletableFuture.anyOf(onException, completableFuture).thenCombine((CompletionStage) handle, (obj3, th2) -> {
            return th2;
        }).thenCompose((Function<? super V, ? extends CompletionStage<U>>) th3 -> {
            return invokeOnCluster(createExecutionPlans, executionPlan2 -> {
                return new CompleteOperation(this.executionId, topologyChangeOrIdentity(th3));
            }, 3);
        }).handle((obj4, th4) -> {
            if (th4 != null) {
                return ExceptionUtil.peel(th4);
            }
            return null;
        }), (th5, th6) -> {
            return th5 == null ? th6 : th5;
        }).exceptionally(Function.identity()).thenAccept(th7 -> {
            getLogger().info("Execution of job " + this.executionId + " completed in " + (System.currentTimeMillis() - currentTimeMillis) + "ms.");
            doSendResponse(topologyChangeOrIdentity(th7));
        });
    }

    private static Throwable topologyChangeOrIdentity(Throwable th) {
        return th instanceof MemberLeftException ? new TopologyChangedException("Topology has been changed", th) : th;
    }

    private CompletableFuture<Object> invokeOnCluster(Map<Member, ExecutionPlan> map, Function<ExecutionPlan, Operation> function, int i) {
        return invokeOnCluster(map, function, new CompletableFuture<>(), false, i);
    }

    private CompletableFuture<Object> invokeOnCluster(Map<Member, ExecutionPlan> map, Function<ExecutionPlan, Operation> function, CompletableFuture<Void> completableFuture, boolean z, int i) {
        AtomicInteger atomicInteger = new AtomicInteger(map.size());
        return allOf((Collection) map.entrySet().stream().map(entry -> {
            return getNodeEngine().getOperationService().createInvocationBuilder(JetService.SERVICE_NAME, (Operation) function.apply(entry.getValue()), ((Member) entry.getKey()).getAddress()).setDoneCallback(() -> {
                if (atomicInteger.decrementAndGet() == 0) {
                    completableFuture.complete(null);
                }
            }).setTryCount(i).invoke();
        }).collect(Collectors.toList()), z);
    }

    @Override // com.hazelcast.jet.impl.operation.AsyncExecutionOperation
    public void cancel() {
        if (this.executionInvocationFuture != null) {
            getLogger().info("Cancelling job " + this.executionId);
            this.executionInvocationFuture.cancel(true);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.hazelcast.jet.impl.operation.AsyncExecutionOperation, com.hazelcast.spi.Operation
    public void writeInternal(ObjectDataOutput objectDataOutput) throws IOException {
        super.writeInternal(objectDataOutput);
        objectDataOutput.writeObject(this.dag);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.hazelcast.jet.impl.operation.AsyncExecutionOperation, com.hazelcast.spi.Operation
    public void readInternal(ObjectDataInput objectDataInput) throws IOException {
        super.readInternal(objectDataInput);
        this.dag = (DAG) objectDataInput.readObject();
    }

    private static CompletableFuture<Object> allOf(Collection<ICompletableFuture> collection, boolean z) {
        CompletableFuture<Object> completableFuture = new CompletableFuture<>();
        completableFuture.whenComplete((obj, th) -> {
            if (th instanceof CancellationException) {
                collection.forEach(iCompletableFuture -> {
                    iCompletableFuture.cancel(true);
                });
            }
        });
        AtomicInteger atomicInteger = new AtomicInteger(collection.size());
        AtomicReference atomicReference = new AtomicReference();
        collection.forEach(iCompletableFuture -> {
            iCompletableFuture.andThen(new Callback(obj2 -> {
                if (atomicInteger.decrementAndGet() == 0) {
                    Throwable th2 = (Throwable) atomicReference.get();
                    if (th2 == null) {
                        completableFuture.complete(null);
                    } else {
                        completableFuture.completeExceptionally(th2);
                    }
                }
            }, th2 -> {
                atomicReference.compareAndSet(null, th2);
                if (z) {
                    collection.forEach(iCompletableFuture -> {
                        iCompletableFuture.cancel(true);
                    });
                }
                if (atomicInteger.decrementAndGet() == 0) {
                    completableFuture.completeExceptionally((Throwable) atomicReference.get());
                }
            }));
        });
        return completableFuture;
    }

    private static <T> CompletableFuture<Throwable> onException(CompletableFuture<T> completableFuture) {
        CompletableFuture<Throwable> completableFuture2 = new CompletableFuture<>();
        completableFuture.whenComplete((BiConsumer) (obj, th) -> {
            if (th != null) {
                completableFuture2.complete(th);
            }
        });
        return completableFuture2;
    }
}
