package com.hazelcast.jet.impl;

import com.hazelcast.internal.cluster.MemberInfo;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.datamodel.Tuple3;
import com.hazelcast.jet.impl.JobExecutionRecord;
import com.hazelcast.jet.impl.execution.SnapshotFlags;
import com.hazelcast.jet.impl.operation.SnapshotPhase1Operation;
import com.hazelcast.jet.impl.operation.SnapshotPhase2Operation;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.IMap;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/hazelcast/jet/impl/MasterSnapshotContext.class */
public class MasterSnapshotContext {
    final MasterContext mc;
    private final ILogger logger;
    private boolean snapshotInProgress;

    @Nonnull
    private volatile CompletableFuture<Void> terminalSnapshotFuture = CompletableFuture.completedFuture(null);
    private final Queue<Tuple3<String, Boolean, CompletableFuture<Void>>> snapshotQueue = new LinkedList();
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MasterSnapshotContext(MasterContext masterContext, ILogger iLogger) {
        this.mc = masterContext;
        this.logger = iLogger;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void enqueueSnapshot(String str, boolean z, CompletableFuture<Void> completableFuture) {
        this.snapshotQueue.add(Tuple3.tuple3(str, Boolean.valueOf(z), completableFuture));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startScheduledSnapshot(long j) {
        this.mc.lock();
        try {
            if (this.mc.jobStatus() != JobStatus.RUNNING) {
                this.logger.fine("Not beginning snapshot, " + this.mc.jobIdString() + " is not RUNNING, but " + this.mc.jobStatus());
            } else if (this.mc.executionId() != j) {
                this.logger.fine("Not beginning snapshot since unexpected execution ID received for " + this.mc.jobIdString() + ". Received execution ID: " + Util.idToString(j));
            } else {
                enqueueSnapshot(null, false, null);
                tryBeginSnapshot();
            }
        } finally {
            this.mc.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void tryBeginSnapshot() {
        this.mc.coordinationService().submitToCoordinatorThread(() -> {
            this.mc.lock();
            try {
                if (this.mc.jobStatus() != JobStatus.RUNNING) {
                    this.logger.fine("Not beginning snapshot, " + this.mc.jobIdString() + " is not RUNNING, but " + this.mc.jobStatus());
                    this.mc.unlock();
                    return;
                }
                if (this.snapshotInProgress) {
                    this.logger.fine("Not beginning snapshot since one is already in progress " + this.mc.jobIdString());
                    this.mc.unlock();
                    return;
                }
                if (this.terminalSnapshotFuture.isDone()) {
                    this.logger.fine("Not beginning snapshot since terminal snapshot is already completed " + this.mc.jobIdString());
                    this.mc.unlock();
                    return;
                }
                Tuple3<String, Boolean, CompletableFuture<Void>> poll = this.snapshotQueue.poll();
                if (poll == null) {
                    return;
                }
                this.snapshotInProgress = true;
                String f0 = poll.f0();
                boolean booleanValue = poll.f1().booleanValue();
                CompletableFuture<Void> f2 = poll.f2();
                this.mc.jobExecutionRecord().startNewSnapshot(f0);
                long executionId = this.mc.executionId();
                this.mc.unlock();
                this.mc.writeJobExecutionRecord(false);
                long ongoingSnapshotId = this.mc.jobExecutionRecord().ongoingSnapshotId();
                boolean z = f0 != null;
                int create = SnapshotFlags.create(booleanValue, z);
                String exportedSnapshotMapName = z ? JobRepository.exportedSnapshotMapName(f0) : JobRepository.snapshotDataMapName(this.mc.jobId(), this.mc.jobExecutionRecord().ongoingDataMapIndex());
                this.mc.nodeEngine().getHazelcastInstance().getMap(exportedSnapshotMapName).clear();
                LoggingUtil.logFine(this.logger, "Starting snapshot %d for %s, flags: %s, writing to: %s", Long.valueOf(ongoingSnapshotId), this.mc.jobIdString(), SnapshotFlags.toString(create), f0);
                this.mc.invokeOnParticipants(executionPlan -> {
                    return new SnapshotPhase1Operation(this.mc.jobId(), executionId, ongoingSnapshotId, exportedSnapshotMapName, create);
                }, collection -> {
                    onSnapshotPhase1Complete(collection, executionId, ongoingSnapshotId, exportedSnapshotMapName, create, f2);
                }, null, true);
            } finally {
                this.mc.unlock();
            }
        });
    }

    private void onSnapshotPhase1Complete(Collection<Map.Entry<MemberInfo, Object>> collection, long j, long j2, String str, int i, @Nullable CompletableFuture<Void> completableFuture) {
        this.mc.coordinationService().submitToCoordinatorThread(() -> {
            if (j != this.mc.executionId()) {
                LoggingUtil.logFine(this.logger, "%s: ignoring responses for snapshot %s phase 1: the responses are from a different execution: %s. Responses: %s", this.mc.jobIdString(), Long.valueOf(j2), Util.idToString(j), collection);
                return;
            }
            SnapshotPhase1Operation.SnapshotPhase1Result snapshotPhase1Result = new SnapshotPhase1Operation.SnapshotPhase1Result();
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                SnapshotPhase1Operation.SnapshotPhase1Result value = ((Map.Entry) it.next()).getValue();
                if (value instanceof Throwable) {
                    value = new SnapshotPhase1Operation.SnapshotPhase1Result(0L, 0L, 0L, (Throwable) value);
                }
                snapshotPhase1Result.merge(value);
            }
            IMap map = this.mc.nodeEngine().getHazelcastInstance().getMap(str);
            try {
                SnapshotValidationRecord snapshotValidationRecord = new SnapshotValidationRecord(j2, snapshotPhase1Result.getNumChunks(), snapshotPhase1Result.getNumBytes(), this.mc.jobExecutionRecord().ongoingSnapshotStartTime(), this.mc.jobId(), this.mc.jobName(), this.mc.jobRecord().getDagJson());
                Object put = map.put(SnapshotValidationRecord.KEY, snapshotValidationRecord);
                if (str.startsWith(JobRepository.EXPORTED_SNAPSHOTS_PREFIX)) {
                    this.mc.jobRepository().cacheValidationRecord(str.substring(JobRepository.EXPORTED_SNAPSHOTS_PREFIX.length()), snapshotValidationRecord);
                }
                if (put != null) {
                    this.logger.severe("SnapshotValidationRecord overwritten after writing to '" + str + "' for " + this.mc.jobIdString() + ": snapshot data might be corrupted");
                }
            } catch (Exception e) {
                snapshotPhase1Result.merge(new SnapshotPhase1Operation.SnapshotPhase1Result(0L, 0L, 0L, e));
            }
            boolean z = snapshotPhase1Result.getError() == null;
            JobExecutionRecord.SnapshotStats ongoingSnapshotDone = this.mc.jobExecutionRecord().ongoingSnapshotDone(snapshotPhase1Result.getNumBytes(), snapshotPhase1Result.getNumKeys(), snapshotPhase1Result.getNumChunks(), snapshotPhase1Result.getError());
            this.mc.writeJobExecutionRecord(false);
            if (this.logger.isFineEnabled()) {
                ILogger iLogger = this.logger;
                Object[] objArr = new Object[8];
                objArr[0] = Long.valueOf(j2);
                objArr[1] = this.mc.jobIdString();
                objArr[2] = z ? "SUCCESS" : "FAILURE";
                objArr[3] = Long.valueOf(ongoingSnapshotDone.duration());
                objArr[4] = Long.valueOf(ongoingSnapshotDone.numBytes());
                objArr[5] = Long.valueOf(ongoingSnapshotDone.numKeys());
                objArr[6] = Long.valueOf(ongoingSnapshotDone.numChunks());
                objArr[7] = str;
                iLogger.fine(String.format("Snapshot %d phase 1 for %s completed with status %s in %dms, %,d bytes, %,d keys in %,d chunks, stored in '%s', proceeding to phase 2", objArr));
            }
            if (!z) {
                this.logger.warning(this.mc.jobIdString() + " snapshot " + j2 + " phase 1 failed on some member(s), one of the failures: " + snapshotPhase1Result.getError());
                try {
                    map.clear();
                } catch (Exception e2) {
                    this.logger.warning(this.mc.jobIdString() + ": failed to clear snapshot map '" + str + "' after a failure", e2);
                }
            }
            if (!SnapshotFlags.isExport(i)) {
                this.mc.jobRepository().clearSnapshotData(this.mc.jobId(), this.mc.jobExecutionRecord().ongoingDataMapIndex());
            }
            this.mc.invokeOnParticipants(executionPlan -> {
                return new SnapshotPhase2Operation(this.mc.jobId(), j, j2, z && !SnapshotFlags.isExportOnly(i));
            }, collection2 -> {
                onSnapshotPhase2Complete(snapshotPhase1Result.getError(), collection2, j, j2, i, completableFuture, ongoingSnapshotDone.startTime());
            }, null, true);
        });
    }

    private void onSnapshotPhase2Complete(String str, Collection<Map.Entry<MemberInfo, Object>> collection, long j, long j2, int i, @Nullable CompletableFuture<Void> completableFuture, long j3) {
        this.mc.coordinationService().submitToCoordinatorThread(() -> {
            if (j != this.mc.executionId()) {
                LoggingUtil.logFine(this.logger, "%s: ignoring responses for snapshot %s phase 2: the responses are from a different execution: %s. Responses: %s", this.mc.jobIdString(), Long.valueOf(j2), Util.idToString(j), collection);
                return;
            }
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                if (entry.getValue() instanceof Throwable) {
                    this.logger.warning(SnapshotPhase2Operation.class.getSimpleName() + " for snapshot " + j2 + " in " + this.mc.jobIdString() + " failed on member: " + entry, (Throwable) entry.getValue());
                }
            }
            if (completableFuture != null) {
                if (str == null) {
                    completableFuture.complete(null);
                } else {
                    completableFuture.completeExceptionally(new JetException(str));
                }
            }
            this.mc.lock();
            try {
                if (this.mc.executionId() != j) {
                    this.logger.fine("Not completing terminalSnapshotFuture on " + this.mc.jobIdString() + ", new execution already started, snapshot was for executionId=" + Util.idToString(j));
                    this.mc.unlock();
                    return;
                }
                if (!$assertionsDisabled && !this.snapshotInProgress) {
                    throw new AssertionError("snapshot not in progress");
                }
                this.snapshotInProgress = false;
                if (SnapshotFlags.isTerminal(i)) {
                    boolean complete = this.terminalSnapshotFuture.complete(null);
                    if (!$assertionsDisabled && !complete) {
                        throw new AssertionError("terminalSnapshotFuture was already completed");
                    }
                    if (str != null) {
                        this.mc.jobContext().cancelExecutionInvocations(this.mc.jobId(), this.mc.executionId(), null);
                    }
                } else if (!SnapshotFlags.isExport(i)) {
                    this.mc.coordinationService().scheduleSnapshot(this.mc, j);
                }
                if (this.logger.isFineEnabled()) {
                    this.logger.fine("Snapshot " + j2 + " for " + this.mc.jobIdString() + " completed in " + (System.currentTimeMillis() - j3) + "ms, status=" + (str == null ? "success" : "failure: " + str));
                }
                tryBeginSnapshot();
            } finally {
                this.mc.unlock();
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> terminalSnapshotFuture() {
        return this.terminalSnapshotFuture;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onExecutionStarted() {
        this.snapshotInProgress = false;
        if (!$assertionsDisabled && !this.snapshotQueue.isEmpty()) {
            throw new AssertionError("snapshotQueue not empty");
        }
        this.terminalSnapshotFuture = new CompletableFuture<>();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onExecutionTerminated() {
        for (Tuple3<String, Boolean, CompletableFuture<Void>> tuple3 : this.snapshotQueue) {
            if (tuple3.f2() != null) {
                tuple3.f2().completeExceptionally(new JetException("Execution completed before snapshot executed"));
            }
        }
        this.snapshotQueue.clear();
    }

    public ILogger logger() {
        return this.logger;
    }

    static {
        $assertionsDisabled = !MasterSnapshotContext.class.desiredAssertionStatus();
    }
}
