package org.apache.flink.runtime.checkpoint.channel;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.class */
final class ChannelStateWriteRequestDispatcherImpl implements ChannelStateWriteRequestDispatcher {
    private static final Logger LOG = LoggerFactory.getLogger(ChannelStateWriteRequestDispatcherImpl.class);
    private final Map<Long, ChannelStateCheckpointWriter> writers = new HashMap();
    private final CheckpointStorageWorkerView streamFactoryResolver;
    private final ChannelStateSerializer serializer;
    private final int subtaskIndex;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChannelStateWriteRequestDispatcherImpl(int i, CheckpointStorageWorkerView checkpointStorageWorkerView, ChannelStateSerializer channelStateSerializer) {
        this.subtaskIndex = i;
        this.streamFactoryResolver = (CheckpointStorageWorkerView) Preconditions.checkNotNull(checkpointStorageWorkerView);
        this.serializer = (ChannelStateSerializer) Preconditions.checkNotNull(channelStateSerializer);
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcher
    public void dispatch(ChannelStateWriteRequest channelStateWriteRequest) throws Exception {
        LOG.trace("process {}", channelStateWriteRequest);
        try {
            dispatchInternal(channelStateWriteRequest);
        } catch (Exception e) {
            try {
                channelStateWriteRequest.cancel(e);
            } catch (Exception e2) {
                e.addSuppressed(e2);
            }
            throw e;
        }
    }

    private void dispatchInternal(ChannelStateWriteRequest channelStateWriteRequest) throws Exception {
        if (channelStateWriteRequest instanceof CheckpointStartRequest) {
            Preconditions.checkState(!this.writers.containsKey(Long.valueOf(channelStateWriteRequest.getCheckpointId())), "writer not found for request " + channelStateWriteRequest);
            this.writers.put(Long.valueOf(channelStateWriteRequest.getCheckpointId()), buildWriter((CheckpointStartRequest) channelStateWriteRequest));
        } else {
            if (!(channelStateWriteRequest instanceof CheckpointInProgressRequest)) {
                throw new IllegalArgumentException("unknown request type: " + channelStateWriteRequest);
            }
            ChannelStateCheckpointWriter channelStateCheckpointWriter = this.writers.get(Long.valueOf(channelStateWriteRequest.getCheckpointId()));
            CheckpointInProgressRequest checkpointInProgressRequest = (CheckpointInProgressRequest) channelStateWriteRequest;
            if (channelStateCheckpointWriter == null) {
                checkpointInProgressRequest.onWriterMissing();
            } else {
                checkpointInProgressRequest.execute(channelStateCheckpointWriter);
            }
        }
    }

    private ChannelStateCheckpointWriter buildWriter(CheckpointStartRequest checkpointStartRequest) throws Exception {
        return new ChannelStateCheckpointWriter(this.subtaskIndex, checkpointStartRequest, this.streamFactoryResolver.resolveCheckpointStorageLocation(checkpointStartRequest.getCheckpointId(), checkpointStartRequest.getLocationReference()), this.serializer, () -> {
            this.writers.remove(Long.valueOf(checkpointStartRequest.getCheckpointId()));
        });
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcher
    public void fail(Throwable th) {
        Iterator<ChannelStateCheckpointWriter> it = this.writers.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().fail(th);
            } catch (Exception e) {
                LOG.warn("unable to fail write channel state writer", th);
            }
        }
        this.writers.clear();
    }
}
