package com.hazelcast.jet.impl;

import com.hazelcast.cluster.memberselector.MemberSelectors;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.internal.partition.impl.InternalPartitionServiceImpl;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.core.JobNotFoundException;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.core.TopologyChangedException;
import com.hazelcast.jet.impl.deployment.JetClassLoader;
import com.hazelcast.jet.impl.execution.SnapshotRecord;
import com.hazelcast.jet.impl.util.JetGroupProperty;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.exception.RetryableHazelcastException;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.executionservice.InternalExecutionService;
import com.hazelcast.spi.properties.HazelcastProperties;
import com.hazelcast.util.Clock;
import com.hazelcast.util.executor.ExecutorType;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/* loaded from: input_file:com/hazelcast/jet/impl/JobCoordinationService.class */
public class JobCoordinationService {
    private static final String COORDINATOR_EXECUTOR_NAME = "jet:coordinator";
    private static final long RETRY_DELAY_IN_MILLIS = TimeUnit.SECONDS.toMillis(2);
    private final NodeEngineImpl nodeEngine;
    private final JetConfig config;
    private final ILogger logger;
    private final JobRepository jobRepository;
    private final JobExecutionService jobExecutionService;
    private final SnapshotRepository snapshotRepository;
    private final ConcurrentMap<Long, MasterContext> masterContexts = new ConcurrentHashMap();

    public JobCoordinationService(NodeEngineImpl nodeEngineImpl, JetConfig jetConfig, JobRepository jobRepository, JobExecutionService jobExecutionService, SnapshotRepository snapshotRepository) {
        this.nodeEngine = nodeEngineImpl;
        this.config = jetConfig;
        this.logger = nodeEngineImpl.getLogger(getClass());
        this.jobRepository = jobRepository;
        this.jobExecutionService = jobExecutionService;
        this.snapshotRepository = snapshotRepository;
    }

    public void init() {
        InternalExecutionService executionService = this.nodeEngine.getExecutionService();
        long millis = new HazelcastProperties(this.config.getProperties()).getMillis(JetGroupProperty.JOB_SCAN_PERIOD);
        executionService.register(COORDINATOR_EXECUTOR_NAME, 2, Integer.MAX_VALUE, ExecutorType.CACHED);
        executionService.scheduleWithRepetition(COORDINATOR_EXECUTOR_NAME, this::scanJobs, millis, millis, TimeUnit.MILLISECONDS);
    }

    public void reset() {
        this.masterContexts.values().forEach((v0) -> {
            v0.cancelJob();
        });
    }

    public ClassLoader getClassLoader(long j) {
        return this.jobExecutionService.getClassLoader(j, () -> {
            JobConfig jobConfig = getJobConfig(j);
            return new JetClassLoader(jobConfig.getClassLoaderFactory() != null ? jobConfig.getClassLoaderFactory().getJobClassLoader() : null, this.jobRepository.getJobResources(j));
        });
    }

    public Map<Long, MasterContext> getMasterContexts() {
        return new HashMap(this.masterContexts);
    }

    public MasterContext getMasterContext(long j) {
        return this.masterContexts.get(Long.valueOf(j));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void updateQuorumValues() {
        if (shouldCheckQuorumValues()) {
            try {
                int quorumSize = getQuorumSize();
                for (JobRecord jobRecord : this.jobRepository.getJobRecords()) {
                    if (jobRecord.getConfig().isSplitBrainProtectionEnabled() && quorumSize > jobRecord.getQuorumSize() && this.jobRepository.updateJobQuorumSizeIfLargerThanCurrent(jobRecord.getJobId(), quorumSize)) {
                        this.logger.info("Current quorum size: " + jobRecord.getQuorumSize() + " of job " + Util.idToString(jobRecord.getJobId()) + " is updated to: " + quorumSize);
                    }
                }
            } catch (Exception e) {
                this.logger.fine("check quorum values task failed", e);
            }
        }
    }

    private boolean shouldCheckQuorumValues() {
        return isMaster() && this.nodeEngine.isRunning() && getInternalPartitionService().getPartitionStateManager().isInitialized();
    }

    public CompletableFuture<Void> submitOrJoinJob(long j, Data data, JobConfig jobConfig) {
        if (!isMaster()) {
            throw new JetException("Cannot submit Job " + Util.idToString(j) + ". Master address: " + this.nodeEngine.getClusterService().getMasterAddress());
        }
        JobResult jobResult = this.jobRepository.getJobResult(j);
        if (jobResult != null) {
            this.logger.fine("Not starting job " + Util.idToString(j) + " since already completed with result: " + jobResult);
            return jobResult.asCompletableFuture();
        }
        JobRecord jobRecord = new JobRecord(j, Clock.currentTimeMillis(), data, jobConfig, jobConfig.isSplitBrainProtectionEnabled() ? getQuorumSize() : 0);
        MasterContext masterContext = new MasterContext(this.nodeEngine, this, jobRecord);
        MasterContext putIfAbsent = this.masterContexts.putIfAbsent(Long.valueOf(j), masterContext);
        if (putIfAbsent != null) {
            this.logger.fine("Joining to already started job " + Util.idToString(j));
            return putIfAbsent.completionFuture();
        }
        if (completeMasterContextIfJobAlreadyCompleted(masterContext)) {
            return masterContext.completionFuture();
        }
        this.jobRepository.putNewJobRecord(jobRecord);
        this.logger.info("Starting job " + Util.idToString(j) + " based on submit request from client");
        this.nodeEngine.getExecutionService().execute(COORDINATOR_EXECUTOR_NAME, () -> {
            tryStartJob(masterContext);
        });
        return masterContext.completionFuture();
    }

    public CompletableFuture<Void> joinSubmittedJob(long j) {
        if (!isMaster()) {
            throw new JetException("Cannot join Job " + Util.idToString(j) + ". Master address: " + this.nodeEngine.getClusterService().getMasterAddress());
        }
        JobRecord jobRecord = this.jobRepository.getJobRecord(j);
        if (jobRecord != null) {
            return submitOrJoinJob(j, jobRecord.getDag(), jobRecord.getConfig());
        }
        JobResult jobResult = this.jobRepository.getJobResult(j);
        if (jobResult != null) {
            return jobResult.asCompletableFuture();
        }
        throw new JobNotFoundException(j);
    }

    private void startJobIfNotStartedOrCompleted(JobRecord jobRecord) {
        long jobId = jobRecord.getJobId();
        if (this.jobRepository.getJobResult(jobId) != null || this.masterContexts.containsKey(Long.valueOf(jobId))) {
            return;
        }
        MasterContext masterContext = new MasterContext(this.nodeEngine, this, jobRecord);
        if (this.masterContexts.putIfAbsent(Long.valueOf(jobId), masterContext) == null && !completeMasterContextIfJobAlreadyCompleted(masterContext)) {
            this.logger.info("Starting job " + Util.idToString(masterContext.getJobId()) + " discovered by scanning of JobRecords");
            tryStartJob(masterContext);
        }
    }

    private boolean completeMasterContextIfJobAlreadyCompleted(MasterContext masterContext) {
        long jobId = masterContext.getJobId();
        JobResult jobResult = this.jobRepository.getJobResult(jobId);
        if (jobResult != null) {
            this.logger.fine("Completing master context " + Util.idToString(jobId) + " since already completed with result: " + jobResult);
            masterContext.setFinalResult(jobResult.getFailure());
            return this.masterContexts.remove(Long.valueOf(jobId), masterContext);
        }
        if (masterContext.getJobConfig().isAutoRestartOnMemberFailureEnabled() || this.jobRepository.getExecutionIdCount(jobId) <= 0) {
            return false;
        }
        String thisUuid = this.nodeEngine.getNode().getThisUuid();
        TopologyChangedException topologyChangedException = new TopologyChangedException();
        this.logger.info("Completing Job " + Util.idToString(jobId) + " with " + topologyChangedException + " since auto-restart is disabled and the job has been executed before");
        this.jobRepository.completeJob(jobId, thisUuid, System.currentTimeMillis(), topologyChangedException);
        masterContext.setFinalResult(topologyChangedException);
        return this.masterContexts.remove(Long.valueOf(jobId), masterContext);
    }

    private void tryStartJob(MasterContext masterContext) {
        JobRepository jobRepository = this.jobRepository;
        jobRepository.getClass();
        masterContext.tryStartJob((v1) -> {
            return r1.newExecutionId(v1);
        });
    }

    private int getQuorumSize() {
        return (getDataMemberCount() / 2) + 1;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isQuorumPresent(int i) {
        return getDataMemberCount() >= i;
    }

    private int getDataMemberCount() {
        return this.nodeEngine.getClusterService().getMembers(MemberSelectors.DATA_MEMBER_SELECTOR).size();
    }

    public void cancelJob(long j) {
        if (!isMaster()) {
            throw new JetException("Cannot cancel Job " + Util.idToString(j) + ". Master address: " + this.nodeEngine.getClusterService().getMasterAddress());
        }
        if (this.jobRepository.getJobResult(j) != null) {
            this.logger.fine("Cannot cancel Job " + Util.idToString(j) + " because it already has a result");
            return;
        }
        MasterContext masterContext = this.masterContexts.get(Long.valueOf(j));
        if (masterContext == null) {
            throw new RetryableHazelcastException("No MasterContext found for Job " + Util.idToString(j) + " to cancel");
        }
        if (masterContext.isCancelled()) {
            this.logger.info("Job " + Util.idToString(j) + " is already cancelling...");
        } else {
            this.logger.info("Job " + Util.idToString(j) + " cancellation is triggered");
            masterContext.cancelJob();
        }
    }

    public Set<Long> getAllJobIds() {
        HashSet hashSet = new HashSet(this.jobRepository.getAllJobIds());
        hashSet.addAll(this.masterContexts.keySet());
        return hashSet;
    }

    public JobStatus getJobStatus(long j) {
        if (!isMaster()) {
            throw new JetException("Cannot query status of Job " + Util.idToString(j) + ". Master address: " + this.nodeEngine.getClusterService().getMasterAddress());
        }
        JobResult jobResult = this.jobRepository.getJobResult(j);
        if (jobResult != null) {
            return jobResult.getJobStatus();
        }
        MasterContext masterContext = this.masterContexts.get(Long.valueOf(j));
        if (masterContext != null) {
            JobStatus jobStatus = masterContext.jobStatus();
            return jobStatus == JobStatus.RUNNING ? masterContext.isCancelled() ? JobStatus.COMPLETING : JobStatus.RUNNING : jobStatus;
        }
        if (this.jobRepository.getJobRecord(j) != null) {
            return JobStatus.NOT_STARTED;
        }
        JobResult jobResult2 = this.jobRepository.getJobResult(j);
        if (jobResult2 != null) {
            return jobResult2.getJobStatus();
        }
        throw new JobNotFoundException(j);
    }

    public long getJobSubmissionTime(long j) {
        if (!isMaster()) {
            throw new JetException("Cannot query submission time of Job " + Util.idToString(j) + ". Master address: " + this.nodeEngine.getClusterService().getMasterAddress());
        }
        JobRecord jobRecord = this.jobRepository.getJobRecord(j);
        if (jobRecord != null) {
            return jobRecord.getCreationTime();
        }
        JobResult jobResult = this.jobRepository.getJobResult(j);
        if (jobResult != null) {
            return jobResult.getCreationTime();
        }
        throw new JobNotFoundException(j);
    }

    public JobConfig getJobConfig(long j) {
        JobRecord jobRecord = this.jobRepository.getJobRecord(j);
        if (jobRecord != null) {
            return jobRecord.getConfig();
        }
        JobResult jobResult = this.jobRepository.getJobResult(j);
        if (jobResult != null) {
            return jobResult.getJobConfig();
        }
        throw new JobNotFoundException(j);
    }

    public boolean restartJobExecution(long j) {
        MasterContext masterContext = this.masterContexts.get(Long.valueOf(j));
        if (masterContext != null) {
            boolean restartExecution = masterContext.restartExecution();
            if (restartExecution) {
                this.logger.info("Job " + Util.idToString(j) + " is going to be restarted");
            } else {
                this.logger.warning("Cannot restart job " + Util.idToString(j) + " because it is not currently being executed");
            }
            return restartExecution;
        }
        JobResult jobResult = this.jobRepository.getJobResult(j);
        if (jobResult != null) {
            throw new IllegalStateException("Cannot restart job " + Util.idToString(j) + " because it is already " + jobResult.getJobStatus());
        }
        if (this.jobRepository.getJobRecord(j) == null) {
            throw new IllegalStateException("Cannot restart job " + Util.idToString(j) + " because JobRecord was not found");
        }
        this.logger.warning("Cannot restart job " + Util.idToString(j) + " because it is not initialized yet");
        return false;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SnapshotRepository snapshotRepository() {
        return this.snapshotRepository;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void completeJob(MasterContext masterContext, long j, long j2, Throwable th) {
        long jobId = masterContext.getJobId();
        this.jobRepository.completeJob(jobId, this.nodeEngine.getNode().getThisUuid(), j2, th);
        if (this.masterContexts.remove(Long.valueOf(masterContext.getJobId()), masterContext)) {
            this.logger.fine(Util.jobAndExecutionId(jobId, j) + " is completed");
            return;
        }
        MasterContext masterContext2 = this.masterContexts.get(Long.valueOf(jobId));
        if (masterContext2 != null) {
            this.logger.severe("Different master context found to complete " + Util.jobAndExecutionId(jobId, j) + ", master context execution " + Util.idToString(masterContext2.getExecutionId()));
        } else {
            this.logger.severe("No master context found to complete " + Util.jobAndExecutionId(jobId, j));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void scheduleRestart(long j) {
        if (this.masterContexts.get(Long.valueOf(j)) == null) {
            this.logger.severe("Master context for job " + Util.idToString(j) + " not found to schedule restart");
        } else {
            this.logger.fine("Scheduling restart on master for job " + Util.idToString(j));
            this.nodeEngine.getExecutionService().schedule(COORDINATOR_EXECUTOR_NAME, () -> {
                restartJob(j);
            }, RETRY_DELAY_IN_MILLIS, TimeUnit.MILLISECONDS);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void scheduleSnapshot(long j, long j2) {
        MasterContext masterContext = this.masterContexts.get(Long.valueOf(j));
        if (masterContext == null) {
            this.logger.warning("MasterContext not found to schedule snapshot of " + Util.jobAndExecutionId(j, j2));
            return;
        }
        long snapshotIntervalMillis = masterContext.getJobConfig().getSnapshotIntervalMillis();
        InternalExecutionService executionService = this.nodeEngine.getExecutionService();
        if (this.logger.isFineEnabled()) {
            this.logger.fine(Util.jobAndExecutionId(j, j2) + " snapshot is scheduled in " + snapshotIntervalMillis + "ms");
        }
        executionService.schedule(COORDINATOR_EXECUTOR_NAME, () -> {
            beginSnapshot(j, j2);
        }, snapshotIntervalMillis, TimeUnit.MILLISECONDS);
    }

    private void beginSnapshot(long j, long j2) {
        MasterContext masterContext = this.masterContexts.get(Long.valueOf(j));
        if (masterContext == null) {
            this.logger.warning("MasterContext not found to schedule snapshot of " + Util.jobAndExecutionId(j, j2));
            return;
        }
        if (masterContext.completionFuture().isDone() || masterContext.isCancelled() || masterContext.jobStatus() != JobStatus.RUNNING) {
            this.logger.fine("Not starting snapshot since " + Util.jobAndExecutionId(j, j2) + " is done.");
        } else if (shouldStartJobs()) {
            masterContext.beginSnapshot(j2);
        } else {
            scheduleSnapshot(j, j2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void completeSnapshot(long j, long j2, long j3, boolean z) {
        if (this.masterContexts.get(Long.valueOf(j)) == null) {
            this.logger.warning("MasterContext not found to finalize snapshot of " + Util.jobAndExecutionId(j, j2) + " with result: " + z);
            return;
        }
        try {
            SnapshotRecord.SnapshotStatus snapshotStatus = z ? SnapshotRecord.SnapshotStatus.SUCCESSFUL : SnapshotRecord.SnapshotStatus.FAILED;
            this.logger.info(String.format("Snapshot %s for job %s completed with status %s in %dms", Long.valueOf(j3), Util.idToString(j), snapshotStatus, Long.valueOf(this.snapshotRepository.setSnapshotStatus(j, j3, snapshotStatus))));
            try {
                if (z) {
                    this.snapshotRepository.deleteAllSnapshotsExceptOne(j, Long.valueOf(j3));
                } else {
                    this.snapshotRepository.deleteSingleSnapshot(j, Long.valueOf(j3));
                }
            } catch (Exception e) {
                this.logger.warning("Cannot delete old snapshots for " + Util.jobAndExecutionId(j, j2));
            }
            scheduleSnapshot(j, j2);
        } catch (Exception e2) {
            this.logger.warning("Cannot update snapshot status for " + Util.jobAndExecutionId(j, j2) + " snapshot " + j3 + " isSuccess: " + z);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean shouldStartJobs() {
        if (!isMaster() || !this.nodeEngine.isRunning()) {
            return false;
        }
        InternalPartitionServiceImpl internalPartitionService = getInternalPartitionService();
        return internalPartitionService.getPartitionStateManager().isInitialized() && internalPartitionService.isMigrationAllowed() && !internalPartitionService.hasOnGoingMigrationLocal();
    }

    public List<Long> getJobIds(String str) {
        HashMap hashMap = new HashMap();
        this.jobRepository.getJobRecords(str).forEach(jobRecord -> {
        });
        this.masterContexts.values().stream().filter(masterContext -> {
            return str.equals(masterContext.getJobConfig().getName());
        }).forEach(masterContext2 -> {
        });
        this.jobRepository.getJobResults(str).forEach(jobResult -> {
        });
        return (List) hashMap.entrySet().stream().sorted(Comparator.comparing((v0) -> {
            return v0.getValue();
        }).reversed()).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toList());
    }

    private InternalPartitionServiceImpl getInternalPartitionService() {
        return (InternalPartitionServiceImpl) this.nodeEngine.getNode().getPartitionService();
    }

    private void restartJob(long j) {
        MasterContext masterContext = this.masterContexts.get(Long.valueOf(j));
        if (masterContext == null) {
            this.logger.severe("Master context for job " + Util.idToString(j) + " not found to restart");
        } else if (masterContext.isCancelled()) {
            tryStartJob(masterContext);
        } else {
            tryStartJob(masterContext);
        }
    }

    private void scanJobs() {
        if (shouldStartJobs()) {
            try {
                this.jobRepository.getJobRecords().forEach(this::startJobIfNotStartedOrCompleted);
                performCleanup();
            } catch (Exception e) {
                if (e instanceof HazelcastInstanceNotActiveException) {
                    return;
                }
                this.logger.severe("Scanning jobs failed", e);
            }
        }
    }

    private void performCleanup() {
        this.jobRepository.cleanup(this.masterContexts.keySet());
    }

    private boolean isMaster() {
        return this.nodeEngine.getClusterService().isMaster();
    }
}
