package com.hazelcast.jet.impl.execution;

import com.hazelcast.jet.JetException;
import com.hazelcast.jet.impl.SnapshotRepository;
import com.hazelcast.jet.impl.util.AsyncMapWriter;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.NodeEngine;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/hazelcast/jet/impl/execution/StoreSnapshotTasklet.class */
public class StoreSnapshotTasklet implements Tasklet {
    long pendingSnapshotId;
    private final long jobId;
    private final InboundEdgeStream inboundEdgeStream;
    private final SnapshotContext snapshotContext;
    private final AsyncMapWriter mapWriter;
    private final boolean isHigherPrioritySource;
    private final String vertexName;
    private final ILogger logger;
    private boolean hasReachedBarrier;
    private boolean inputIsDone;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final ProgressTracker progTracker = new ProgressTracker();
    private final AtomicInteger numActiveFlushes = new AtomicInteger();
    private State state = State.DRAIN;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/hazelcast/jet/impl/execution/StoreSnapshotTasklet$State.class */
    public enum State {
        DRAIN,
        FLUSH,
        REACHED_BARRIER,
        DONE
    }

    public StoreSnapshotTasklet(SnapshotContext snapshotContext, long j, InboundEdgeStream inboundEdgeStream, NodeEngine nodeEngine, String str, boolean z) {
        this.snapshotContext = snapshotContext;
        this.jobId = j;
        this.inboundEdgeStream = inboundEdgeStream;
        this.vertexName = str;
        this.isHigherPrioritySource = z;
        this.mapWriter = new AsyncMapWriter(nodeEngine);
        this.pendingSnapshotId = snapshotContext.lastSnapshotId() + 1;
        this.mapWriter.setMapName(currMapName());
        this.logger = nodeEngine.getLogger(StoreSnapshotTasklet.class + "." + str + "#snapshot");
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.hazelcast.jet.impl.execution.Tasklet, java.util.concurrent.Callable
    @Nonnull
    public ProgressState call() {
        this.progTracker.reset();
        stateMachineStep();
        return this.progTracker.toProgressState();
    }

    private void stateMachineStep() {
        switch (this.state) {
            case DRAIN:
                this.progTracker.notDone();
                ProgressState drainTo = this.inboundEdgeStream.drainTo(obj -> {
                    if (!(obj instanceof SnapshotBarrier)) {
                        this.mapWriter.put((Map.Entry) obj);
                        return;
                    }
                    SnapshotBarrier snapshotBarrier = (SnapshotBarrier) obj;
                    if (!$assertionsDisabled && this.pendingSnapshotId != snapshotBarrier.snapshotId()) {
                        throw new AssertionError("Unexpected barrier, expected was " + this.pendingSnapshotId + ", but barrier was " + snapshotBarrier.snapshotId() + ", this=" + this);
                    }
                    this.hasReachedBarrier = true;
                });
                if (drainTo.isDone()) {
                    this.inputIsDone = true;
                }
                if (drainTo.isMadeProgress()) {
                    this.progTracker.madeProgress();
                    this.state = State.FLUSH;
                    stateMachineStep();
                    return;
                }
                return;
            case FLUSH:
                this.progTracker.notDone();
                CompletableFuture<Void> completableFuture = new CompletableFuture<>();
                completableFuture.whenComplete(ExceptionUtil.withTryCatch(this.logger, (r5, th) -> {
                    if (th != null) {
                        this.logger.severe("Error writing to snapshot map '" + currMapName() + "'", th);
                        this.snapshotContext.reportError(th);
                    }
                    this.numActiveFlushes.decrementAndGet();
                }));
                if (this.mapWriter.tryFlushAsync(completableFuture)) {
                    this.progTracker.madeProgress();
                    this.numActiveFlushes.incrementAndGet();
                    this.state = this.inputIsDone ? State.DONE : this.hasReachedBarrier ? State.REACHED_BARRIER : State.DRAIN;
                    return;
                }
                return;
            case REACHED_BARRIER:
                this.progTracker.notDone();
                if (this.numActiveFlushes.get() == 0) {
                    this.snapshotContext.snapshotDoneForTasklet();
                    this.pendingSnapshotId++;
                    this.mapWriter.setMapName(currMapName());
                    this.state = this.inputIsDone ? State.DONE : State.DRAIN;
                    this.hasReachedBarrier = false;
                    return;
                }
                return;
            case DONE:
                if (this.numActiveFlushes.get() != 0) {
                    this.progTracker.notDone();
                }
                this.snapshotContext.taskletDone(this.pendingSnapshotId - 1, this.isHigherPrioritySource);
                return;
            default:
                throw new JetException("Unexpected state: " + this.state);
        }
    }

    String currMapName() {
        return SnapshotRepository.snapshotDataMapName(this.jobId, this.pendingSnapshotId, this.vertexName);
    }

    public String toString() {
        return StoreSnapshotTasklet.class.getSimpleName() + '{' + this.vertexName + '}';
    }

    static {
        $assertionsDisabled = !StoreSnapshotTasklet.class.desiredAssertionStatus();
    }
}
