package org.apache.flink.runtime.checkpoint.channel;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
import org.apache.flink.runtime.checkpoint.channel.RefCountingFSDataInputStream;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.AbstractChannelStateHandle;
import org.apache.flink.shaded.guava18.com.google.common.io.Closer;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
@NotThreadSafe
/* loaded from: input_file:org/apache/flink/runtime/checkpoint/channel/ChannelStateReaderImpl.class */
public class ChannelStateReaderImpl implements ChannelStateReader {
    private static final Logger log = LoggerFactory.getLogger(ChannelStateReaderImpl.class);
    private final Map<InputChannelInfo, ChannelStateStreamReader> inputChannelHandleReaders;
    private final Map<ResultSubpartitionInfo, ChannelStateStreamReader> resultSubpartitionHandleReaders;
    private boolean isClosed;

    public ChannelStateReaderImpl(TaskStateSnapshot taskStateSnapshot) {
        this(taskStateSnapshot, new ChannelStateSerializerImpl());
    }

    ChannelStateReaderImpl(TaskStateSnapshot taskStateSnapshot, ChannelStateSerializer channelStateSerializer) {
        this.isClosed = false;
        RefCountingFSDataInputStream.RefCountingFSDataInputStreamFactory refCountingFSDataInputStreamFactory = new RefCountingFSDataInputStream.RefCountingFSDataInputStreamFactory(channelStateSerializer);
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (Map.Entry<OperatorID, OperatorSubtaskState> entry : taskStateSnapshot.getSubtaskStateMappings()) {
            addReaders(hashMap, entry.getValue().getInputChannelState(), refCountingFSDataInputStreamFactory);
            addReaders(hashMap2, entry.getValue().getResultSubpartitionState(), refCountingFSDataInputStreamFactory);
        }
        this.inputChannelHandleReaders = hashMap;
        this.resultSubpartitionHandleReaders = hashMap2;
    }

    private <T> void addReaders(Map<T, ChannelStateStreamReader> map, Collection<? extends AbstractChannelStateHandle<T>> collection, RefCountingFSDataInputStream.RefCountingFSDataInputStreamFactory refCountingFSDataInputStreamFactory) {
        for (AbstractChannelStateHandle<T> abstractChannelStateHandle : collection) {
            Preconditions.checkState(!map.containsKey(abstractChannelStateHandle.getInfo()), "multiple states exist for channel: " + abstractChannelStateHandle.getInfo());
            map.put(abstractChannelStateHandle.getInfo(), new ChannelStateStreamReader(abstractChannelStateHandle, refCountingFSDataInputStreamFactory));
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.ChannelStateReader
    public boolean hasChannelStates() {
        return (this.inputChannelHandleReaders.isEmpty() && this.resultSubpartitionHandleReaders.isEmpty()) ? false : true;
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.ChannelStateReader
    public ChannelStateReader.ReadResult readInputData(InputChannelInfo inputChannelInfo, Buffer buffer) throws IOException {
        Preconditions.checkState(!this.isClosed, "reader is closed");
        log.debug("readInputData, resultSubpartitionInfo: {} , buffer {}", inputChannelInfo, buffer);
        ChannelStateStreamReader channelStateStreamReader = this.inputChannelHandleReaders.get(inputChannelInfo);
        return channelStateStreamReader == null ? ChannelStateReader.ReadResult.NO_MORE_DATA : channelStateStreamReader.readInto(buffer);
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.ChannelStateReader
    public ChannelStateReader.ReadResult readOutputData(ResultSubpartitionInfo resultSubpartitionInfo, BufferBuilder bufferBuilder) throws IOException {
        Preconditions.checkState(!this.isClosed, "reader is closed");
        log.debug("readOutputData, resultSubpartitionInfo: {} , bufferBuilder {}", resultSubpartitionInfo, bufferBuilder);
        ChannelStateStreamReader channelStateStreamReader = this.resultSubpartitionHandleReaders.get(resultSubpartitionInfo);
        return channelStateStreamReader == null ? ChannelStateReader.ReadResult.NO_MORE_DATA : channelStateStreamReader.readInto(bufferBuilder);
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.ChannelStateReader, java.lang.AutoCloseable
    public void close() throws Exception {
        this.isClosed = true;
        Closer create = Closer.create();
        Throwable th = null;
        try {
            for (Map map : Arrays.asList(this.inputChannelHandleReaders, this.resultSubpartitionHandleReaders)) {
                Iterator it = map.values().iterator();
                while (it.hasNext()) {
                    create.register((ChannelStateStreamReader) it.next());
                }
                map.clear();
            }
            if (create != null) {
                if (0 == 0) {
                    create.close();
                    return;
                }
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }
}
