package com.hazelcast.jet.impl;

import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.core.LocalMemberResetException;
import com.hazelcast.core.MemberLeftException;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.NonCompletableFuture;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.LoggingService;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.exception.TargetDisconnectedException;
import com.hazelcast.spi.exception.TargetNotMemberException;
import com.hazelcast.spi.serialization.SerializationService;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/hazelcast/jet/impl/AbstractJobProxy.class */
public abstract class AbstractJobProxy<T> implements Job {
    private final long jobId;
    private final T container;
    private volatile JobConfig jobConfig;
    private final NonCompletableFuture future = new NonCompletableFuture();
    private final AtomicBoolean joinedJob = new AtomicBoolean();
    private final ExecutionCallback<Void> joinJobCallback = new JoinJobCallback();
    private final Supplier<Long> submissionTimeSup = Util.memoizeConcurrent(this::doGetJobSubmissionTime);
    private final ILogger logger = loggingService().getLogger(Job.class);

    /* loaded from: input_file:com/hazelcast/jet/impl/AbstractJobProxy$JoinJobCallback.class */
    private class JoinJobCallback implements ExecutionCallback<Void> {
        private JoinJobCallback() {
        }

        @Override // com.hazelcast.core.ExecutionCallback
        public void onResponse(Void r3) {
            AbstractJobProxy.this.future.internalComplete();
        }

        @Override // com.hazelcast.core.ExecutionCallback
        public synchronized void onFailure(Throwable th) {
            Throwable peel = ExceptionUtil.peel(th);
            if (peel instanceof LocalMemberResetException) {
                String str = "Job " + AbstractJobProxy.this.idAndName() + " failed because the cluster is performing a split-brain merge";
                AbstractJobProxy.this.logger.warning(str, peel);
                AbstractJobProxy.this.future.internalCompleteExceptionally(new CancellationException(str));
            } else {
                if (!AbstractJobProxy.this.isRestartable(peel)) {
                    AbstractJobProxy.this.future.internalCompleteExceptionally(peel);
                    return;
                }
                try {
                    rejoinJob(th);
                } catch (Exception e) {
                    AbstractJobProxy.this.future.internalCompleteExceptionally(ExceptionUtil.peel(e));
                }
            }
        }

        private void rejoinJob(Throwable th) {
            if (AbstractJobProxy.this.masterAddress() != null) {
                AbstractJobProxy.this.logger.fine("Rejoining to job " + AbstractJobProxy.this.idAndName() + " after " + th.getClass().getSimpleName());
                AbstractJobProxy.this.doInvokeJoinJob();
            } else {
                String str = "Job " + AbstractJobProxy.this.idAndName() + " failed because the cluster is performing  split-brain merge and coordinator is not known";
                AbstractJobProxy.this.logger.warning(str, th);
                AbstractJobProxy.this.future.internalCompleteExceptionally(new CancellationException(str));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/impl/AbstractJobProxy$SubmitJobCallback.class */
    public class SubmitJobCallback implements ExecutionCallback<Void> {
        private final CompletableFuture<Void> future;
        private final DAG dag;
        private final JobConfig config;

        SubmitJobCallback(CompletableFuture<Void> completableFuture, DAG dag, JobConfig jobConfig) {
            this.future = completableFuture;
            this.dag = dag;
            this.config = jobConfig;
        }

        @Override // com.hazelcast.core.ExecutionCallback
        public void onResponse(Void r4) {
            this.future.complete(null);
        }

        @Override // com.hazelcast.core.ExecutionCallback
        public synchronized void onFailure(Throwable th) {
            Throwable peel = ExceptionUtil.peel(th);
            if (peel instanceof LocalMemberResetException) {
                String str = "Submission of job " + AbstractJobProxy.this.idAndName() + " failed because the cluster is performing split-brain merge";
                AbstractJobProxy.this.logger.warning(str, peel);
                this.future.completeExceptionally(new CancellationException(str));
            } else {
                if (!AbstractJobProxy.this.isRestartable(peel)) {
                    this.future.completeExceptionally(peel);
                    return;
                }
                try {
                    resubmitJob(th);
                } catch (Exception e) {
                    this.future.completeExceptionally(ExceptionUtil.peel(e));
                }
            }
        }

        private void resubmitJob(Throwable th) {
            if (AbstractJobProxy.this.masterAddress() != null) {
                AbstractJobProxy.this.logger.fine("Resubmitting job " + AbstractJobProxy.this.idAndName() + " after " + th.getClass().getSimpleName());
                AbstractJobProxy.this.invokeSubmitJob(AbstractJobProxy.this.serializationService().toData(this.dag), this.config).andThen(this);
            } else {
                String str = "Job " + AbstractJobProxy.this.idAndName() + " failed because the cluster is performing  split-brain merge and coordinator is not known";
                AbstractJobProxy.this.logger.warning(str, th);
                this.future.completeExceptionally(new CancellationException(str));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractJobProxy(T t, long j) {
        this.jobId = j;
        this.container = t;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractJobProxy(T t, long j, DAG dag, JobConfig jobConfig) {
        this.jobId = j;
        this.container = t;
        try {
            doSubmitJob(dag, jobConfig);
            this.joinedJob.set(true);
            doInvokeJoinJob();
        } catch (Throwable th) {
            throw ExceptionUtil.rethrow(th);
        }
    }

    @Override // com.hazelcast.jet.Job
    public long getId() {
        return this.jobId;
    }

    @Override // com.hazelcast.jet.Job
    @Nonnull
    public JobConfig getConfig() {
        JobConfig jobConfig = this.jobConfig;
        if (jobConfig != null) {
            return jobConfig;
        }
        synchronized (this) {
            if (this.jobConfig != null) {
                return this.jobConfig;
            }
            this.jobConfig = doGetJobConfig();
            if (this.jobConfig == null) {
                throw new NullPointerException("Supplier returned null");
            }
            return this.jobConfig;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String idAndName() {
        String str;
        JobConfig jobConfig = this.jobConfig;
        StringBuilder append = new StringBuilder().append(getIdString()).append(" (name ");
        if (jobConfig != null) {
            str = "'" + (jobConfig.getName() != null ? jobConfig.getName() : "") + "'";
        } else {
            str = "??";
        }
        return append.append(str).append(')').toString();
    }

    @Override // com.hazelcast.jet.Job
    @Nonnull
    public CompletableFuture<Void> getFuture() {
        if (this.joinedJob.compareAndSet(false, true)) {
            doInvokeJoinJob();
        }
        return this.future;
    }

    @Override // com.hazelcast.jet.Job
    public long getSubmissionTime() {
        return this.submissionTimeSup.get().longValue();
    }

    @Override // com.hazelcast.jet.Job
    public void cancel() {
        terminate(TerminationMode.CANCEL_FORCEFUL);
    }

    @Override // com.hazelcast.jet.Job
    public void restart() {
        terminate(TerminationMode.RESTART_GRACEFUL);
    }

    @Override // com.hazelcast.jet.Job
    public void suspend() {
        terminate(TerminationMode.SUSPEND_GRACEFUL);
    }

    private void terminate(TerminationMode terminationMode) {
        this.logger.fine("Sending " + terminationMode + " request for job " + idAndName());
        while (true) {
            try {
                invokeTerminateJob(terminationMode).get();
                return;
            } catch (Exception e) {
                if (!isRestartable(e)) {
                    throw ExceptionUtil.rethrow(e);
                }
                this.logger.fine("Re-sending " + terminationMode + " request for job " + idAndName());
            }
        }
    }

    public String toString() {
        return "Job{id=" + getIdString() + ", name=" + getName() + ", submissionTime=" + Util.toLocalDateTime(getSubmissionTime()) + ", status=" + getStatus() + "}";
    }

    protected abstract ICompletableFuture<Void> invokeSubmitJob(Data data, JobConfig jobConfig);

    protected abstract ICompletableFuture<Void> invokeJoinJob();

    protected abstract ICompletableFuture<Void> invokeTerminateJob(TerminationMode terminationMode);

    protected abstract long doGetJobSubmissionTime();

    protected abstract JobConfig doGetJobConfig();

    protected abstract Address masterAddress();

    protected abstract SerializationService serializationService();

    protected abstract LoggingService loggingService();

    /* JADX INFO: Access modifiers changed from: protected */
    public T container() {
        return this.container;
    }

    private void doSubmitJob(DAG dag, JobConfig jobConfig) {
        CompletableFuture completableFuture = new CompletableFuture();
        invokeSubmitJob(serializationService().toData(dag), jobConfig).andThen(new SubmitJobCallback(completableFuture, dag, jobConfig));
        completableFuture.join();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isRestartable(Throwable th) {
        return (th instanceof MemberLeftException) || (th instanceof TargetDisconnectedException) || (th instanceof TargetNotMemberException);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doInvokeJoinJob() {
        invokeJoinJob().andThen(this.joinJobCallback);
    }
}
